首页 > 其他分享 >DHTServer

DHTServer

时间:2023-03-27 22:45:57浏览次数:30  
标签:String get DHTServer id new 节点 请求

/**
 * 每个DHTServer对应多个本地DHT节点, 每个本地DHT节点监听一个端口, 每个DHTServer对象都有一个工作线程worker,
 * 这个线程负责当前DHTServer对象维护的所有节点的数据读写及逻辑处理, 这些操作都是非阻塞的,
 * 因此在多核处理器上创建和处理器数量相同的DHTServer对象就能最大限度利用CPU资源 每个DHTServer之间都是相互独立的,
 * 不能存在数据共享和争用, 由于每个DHTServer内部的所有操作都是单线程, 所以所有操作都不需要进行线程同步, 最大限度消除同步控制的开销
 * 
 * @author dgqjava
 *
 */
@Slf4j
public class DHTServer extends Thread {

    // private volatile boolean stop = false;
//     private Channel channel;
//     private ConnectionlessBootstrap b;

    /**
     * 自动重新加入DHT网络 timer
     * 
     */
    // private Timer autoRejoinDHTTimer;
    private LookupService ls = null;
    private Map<String, String> allowedIp = null;
    private OnGetPeersListener onGetPeersListener = null;
    private OnAnnouncePeerListener onAnnouncePeerListener = null;
    // DHT根节点, 这是几个长期稳定的公用节点, 通过这几个节点查找其他节点来初始化路由表
    private static final NodeInfo[] ROOT_NODES;
    // 用于存储待处理的AnnouncePeerData的数据
    private final Queue<AnnouncePeerData> announcePeerData = new LinkedList<>();
    // 这里面是当前DHTServer对象管理的本地的DHT节点
    private final List<LocalDHTNode> localDHTNodes = new ArrayList<>();
    // 这个线程负责处理所有的操作, 包括 :
    // NIO数据读写,
    // 定时处理外部节点发送过来的请求数据并响应,
    // 定时发起查找find_node请求来更新本地路由表及让更多的其他节点认识我们
    private ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
    // private ConcurrentHashMap<String, ScheduledFuture<?>> taskMap = new
    // ConcurrentHashMap<>(4);

    // 配置文件
    Prop prop = PropKit.use("crawler.properties");
    final int pNewNodes = prop.getInt("main.dhtserver.max.newnodes");
    final int pOldNodes = prop.getInt("main.dhtserver.max.oldnodes");
    private final Map<String, LocalDHTNode> id2LocalDHTNode = new HashMap<>(); // 本地的nodeId和本地DHT节点的映射表
    private final NIOHelper nioHelper = new NIOHelper(worker); // NIOHelper类里封装了所有NIO操作,
                                                                // 这样在DHTServer里只需要负责处理其他节点的请求数据和发送find_node等业务处理的逻辑而不需要关注NIO相关的所有代码
    static {
        try {
            ROOT_NODES = new NodeInfo[] { // 初始化DHT根节点
                    new NodeInfo(InetAddress.getByName("router.utorrent.com").getHostAddress(), 6881, null),
                    new NodeInfo(InetAddress.getByName("dht.transmissionbt.com").getHostAddress(), 6881, null),
                    new NodeInfo(InetAddress.getByName("router.bittorrent.com").getHostAddress(), 6881, null) };
        } catch (Exception e) {
            log.info("ERROR:ROOT_NODES");
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建一个DHTServer对象并监听到多个端口, 每个端口关联一个唯一的nodeId
     * 
     * @param ports 端口列表
     * @param ids   nodeId列表
     */
    public DHTServer(Integer ports, String ids) {
        try {
            Map<Object, Object> esmap = PropertiesUtil.readMulProperties("/application.properties");
            String p = esmap.get("dbconf_path").toString().concat("/dbConf/GeoIP.dat");

            ls = new LookupService(p, LookupService.GEOIP_MEMORY_CACHE);

            // 监听指定的端口, 并初始化数据
//            for (int i = 0; i < ids.size(); i++) {
//                String id = ids.get(i);
//                int port = ports.get(i);

            // 为当前节点创建一个路由表, 这里的192.168.0.1可以是任意一个合法的ip地址, 只是为了NodeInfo的构造方法不报NPE
            RoutingList routingList = new RoutingList(new NodeInfo("192.168.0.1", ports, ids));

            // 所有未请求或者短时间内未重复请求的节点放到这个栈空间内,
            // 最新得到的节点信息放到栈顶, 优先向最新的节点发送find_node请求, 因为最新的节点是活动的概率最大,
            // 如果栈内节点数超过10000, 则将他减半防止内存占用过多, 如果栈内节点数为0, 则将根节点重新加入栈, 开始新的一轮查找,
            // 这样确保我们的节点一直在不停地 发送find_node来认识更多的其他节点
            Stack<NodeInfo> newNodes = new Stack<>();
            for (NodeInfo nodeInfo : ROOT_NODES) {
                newNodes.push(nodeInfo);
            }

            // 我们本地的DHT节点
            LocalDHTNode dhtNode = new LocalDHTNode(ids, newNodes, routingList);

            // 添加一个端口监听
            nioHelper.bind(ports, ids);
            // 添加id和本地节点映射关系
            id2LocalDHTNode.put(ids, dhtNode);

            // 添加新的本地节点到本地节点列表
            localDHTNodes.add(dhtNode);

            // }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (RuntimeException e) {
            e.printStackTrace();
        }

        // 每隔10毫秒通过nioHelper对象从队列中读取要处理的数据, 这些数据是外部节点请求或者回复我们的数据
        worker.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (Main.me.isShutdown) {
                        sleep(20000);
                        System.out.println("nioHelper waiting........................");
                        return;
                    }

                    // 每隔10毫秒都会把队列里要处理的数据耗尽, 由于所有操作都是非阻塞且单线程的, 所以这些操作都会非常快速的完成, 不会影响其他功能
                    // while (!nioHelper.getReadDataQueue().isEmpty()) {
                    if (!nioHelper.getReadDataQueue().isEmpty()) {
                        try {
                            // 获取待处理的数据
                            NIOHelper.ReadData readData = nioHelper.getReadDataQueue().poll();
                            InetSocketAddress remoteAddress = (InetSocketAddress) readData.getRemoteAddress();
                            LocalDHTNode localDHTNode = id2LocalDHTNode.get(readData.getId());
                            Stack<NodeInfo> newNodes = localDHTNode.getNewNodes();
                            Set<String> oldNodes = localDHTNode.getOldNodes();
                            RoutingList routingList = localDHTNode.getRoutingList();
                            // 将收到的数据转化为B编码字典对象
                            if (readData.getData() == null)
                                return;
                            BencodeMap response = BencodeMap
                                    .getMap(new String(readData.getData(), CharsetName.IOS_8859_1), 0);
                            // 当y为r时为其他节点对我们请求的回复, y为q时为其他节点对我们的请求, y为e时表示错误, 我们这里只处理请求和回复两种数据
                            if (response == null)
                                return;
                            byte[] b = response.get(new BencodeString("y")).getData();
                            if (b == null) {
                                System.out.println("ERRCODE:008:response2 is null~~~~~~~~~~~~~~~~~~~~~~~~~");
                                return;
                            }
                            String y = new String(b, CharsetName.IOS_8859_1);
                            if (y.equals("r")) {
                                // 获取数据来源方的nodeId
                                String remoteId = new String(((BencodeMap) (response.get(new BencodeString("r"))))
                                        .get(new BencodeString("id")).getData(), CharsetName.IOS_8859_1);

                                // 将请求发送方的信息保存到路由表, 因为所有对我们发送请求或者响应请求的节点肯定都是活跃节点, 路由表中的桶使用LUR算法实现, 直接替换掉最旧的节点
                                routingList.addNode(new NodeInfo(remoteAddress.getAddress().getHostAddress(),
                                        remoteAddress.getPort(), remoteId));

                                // 获取响应的的节点列表并解析节点信息, 每个节点信息长度为26, 包括20字节的节点id, 4字节的ip, 2字节的端口,
                                // 由于我们只发送find_node请求, 因此所有响应类型的数据都是对find_node的响应
                                if (new BencodeString("nodes") != null) {
                                    byte[] nodes = ((BencodeMap) (response.get(new BencodeString("r"))))
                                            .get(new BencodeString("nodes")).getData();
                                    for (int i = 0; i < nodes.length; i += 26) {
                                        // 获取节点id, 端口和ip
                                        String nodeId = new String(Arrays.copyOfRange(nodes, i, i + 20),
                                                CharsetName.IOS_8859_1);
                                        byte[] ipPort = Arrays.copyOfRange(nodes, i + 20, i + 26);
                                        String ip = (ipPort[0] & 0xFF) + "." + (ipPort[1] & 0xFF) + "."
                                                + (ipPort[2] & 0xFF) + "." + (ipPort[3] & 0xFF);
                                        int port = ByteBuffer.wrap(new byte[] { 0, 0, ipPort[4], ipPort[5] }).getInt();

                                        // 判断得到的节点信息是否短期内已经请求过, 这里简单的用ip+端口来判断, 如果没有请求过则加入到新节点栈中
                                        boolean isContain = oldNodes
                                                .contains(new String(ipPort, CharsetName.IOS_8859_1));
                                        if (!isContain) {
                                            // byte[] nid = new byte[20];
                                            // System.arraycopy(nodes, i, nid, 0, 20);
                                            newNodes.push(new NodeInfo(ip, port, nodeId));
                                        }
                                    }
                                }
                                return;
                            }

                            // y为q时的数据是其他节点对我们的请求, 需要对其他节点的请求进行响应
                            if (y.equals("q")) {
                                // 获取数据来源方的nodeId
                                String remoteId = new String(((BencodeMap) (response.get(new BencodeString("a"))))
                                        .get(new BencodeString("id")).getData(), CharsetName.IOS_8859_1);

                                // 将请求发送方的信息保存到路由表, 因为所有对我们发送请求或者响应请求的节点肯定都是活跃节点, 路由表中的桶使用LUR算法实现, 直接替换掉最旧的节点
                                routingList.addNode(new NodeInfo(remoteAddress.getAddress().getHostAddress(),
                                        remoteAddress.getPort(), remoteId));

                                // 获取数据来源方的transactionId, 对请求进行响应时必须带上请求中的transactionId, 这样对方才知道这个响应针对的是哪个请求
                                String t = new String(response.get(new BencodeString("t")).getData(),
                                        CharsetName.IOS_8859_1);

                                // 获取请求类型, 请求类型一共四种 : ping, find_node, get_peers, announce_peer
                                String q = new String(response.get(new BencodeString("q")).getData(),
                                        CharsetName.IOS_8859_1);

                                // 处理ping请求, 这个是对方检查我们节点是否存活
                                if (q.equals("ping")) {
                                    responsePing(readData.getId(), remoteAddress, t);
                                    return;
                                }

                                // 处理find_node请求, 这个是对方向我们节点查找距离目标节点最近的8个节点
                                if (q.equals("find_node")) {
                                    // 获取要查找的目标节点id
                                    String target = new String(
                                            ((BencodeMap) (response.get(new BencodeString("a"))))
                                                    .get(new BencodeString("target")).getData(),
                                            CharsetName.IOS_8859_1);
                                    // 获取距离目标节点id最近的八个节点信息并响应
                                    responseFindNode(readData.getId(), remoteAddress, t, routingList
                                            .getNearestNodes(new NodeId(target.getBytes(CharsetName.IOS_8859_1))));
                                    return;
                                }

                                // 处理get_peers请求, 这个是对方向我们查询可以下载到指定磁力链接数据的节点信息, 如果我们没有可下载的节点则提供最近的8个节点返回,
                                // 因为我们不保存peer的信息, 所以我们这里每次都只返回最近的八个节点
                                if (q.equals("get_peers")) {
                                    // 获取infoHash, 这个就能转化为磁力链接, get_peers中得到的infoHash有可能是无效的
                                    String infoHash = new String(
                                            ((BencodeMap) (response.get(new BencodeString("a"))))
                                                    .get(new BencodeString("info_hash")).getData(),
                                            CharsetName.IOS_8859_1);

                                    // 转化为磁力链接并打印
                                    // printMagnet(infoHash, "get_peers");
                                    // 获取距离infoHash最近的八个节点信息并响应
                                    byte[] tid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),
                                            readData.getId().getBytes(CharsetName.IOS_8859_1));
                                    responseGetPeers(localDHTNode.getId(), tid, remoteAddress, t, routingList
                                            .getNearestNodes(new NodeId(infoHash.getBytes(CharsetName.IOS_8859_1))));
                                    if (onGetPeersListener != null) {
                                        announcePeerData.add(new AnnouncePeerData(remoteAddress,
                                                ((BencodeMap) (response.get(new BencodeString("a"))))
                                                        .get(new BencodeString("info_hash")).getData(),
                                                remoteAddress.getPort()));
//                                        onGetPeersListener.onGetPeers(remoteAddress,
//                                                ((BencodeMap) (response.get(new BencodeString("a"))))
//                                                        .get(new BencodeString("info_hash")).getData());
                                    }
                                    return;
                                }

                                // 处理announce_peer请求, 这个是对方告诉我们他已经找到了文件的下载地址, 我们这里只获取他正在下载的磁力链接,
                                // 这里获取的磁力链接基本上是有效的
                                if (q.equals("announce_peer")) {
                                    // 获取infoHash
//                                    String infoHash = new String(((BencodeMap) (response.get(new BencodeString("a"))))
//                                            .get(new BencodeString("info_hash")).getData(), CharsetName.IOS_8859_1);

                                    // 转化为磁力链接并打印
                                    // printMagnet(infoHash, "announce_peer");
                                    // 响应对方的announce_peer请求
                                    byte[] tid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),
                                            readData.getId().getBytes(CharsetName.IOS_8859_1));
                                    responseAnnouncePeer(readData.getId(), tid, remoteAddress, t);
                                    if (onAnnouncePeerListener != null) {
                                        announcePeerData.add(new AnnouncePeerData(remoteAddress,
                                                ((BencodeMap) (response.get(new BencodeString("a"))))
                                                        .get(new BencodeString("info_hash")).getData(),
                                                remoteAddress.getPort()));
//                                        onAnnouncePeerListener.onAnnouncePeer(remoteAddress,
//                                                ((BencodeMap) (response.get(new BencodeString("a"))))
//                                                        .get(new BencodeString("info_hash")).getData(),
//                                                remoteAddress.getPort());
                                    }
                                    return;
                                }
                            }

                        } catch (Exception ex) {
                            ex.printStackTrace();
                            log.error(ex.getMessage());
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error(e.getMessage());
                }
            }
        }, 0, 1, TimeUnit.MILLISECONDS);

        // 每秒对当前DHTServer维护的所有本地节点执行一次find_node操作, 让更多的节点认识我们, 这样才能有更多的节点发送磁力链接到我们节点
        worker.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (Main.me.isShutdown) {
                        sleep(20000);
                        // System.out.println("find_node waiting...........................");
                        return;
                    }

                    // 遍历本地节点
                    for (LocalDHTNode localDHTNode : localDHTNodes) {
                        // LocalDHTNode localDHTNode = localDHTNodes.get(localDHTNodesIng);

                        Stack<NodeInfo> newNodes = localDHTNode.getNewNodes();
                        // System.out.println("遍历本地节点"+newNodes.size());
                        Set<String> oldNodes = localDHTNode.getOldNodes();
                        String id = localDHTNode.getId();
                        // 如果新节点列表为空, 则重新加入根节点, 重新开始查找
                        if (newNodes.isEmpty()) {
                            // System.out.println("CODE:001:如果新节点列表为空, 则重新加入根节点, 重新开始查找");
                            newNodes.push(ROOT_NODES[0]);
                            newNodes.push(ROOT_NODES[1]);
                            newNodes.push(ROOT_NODES[2]);
                            log.info("CODE:001:如果新节点列表为空, 则重新加入根节点, 重新开始查找");
                        }
                        // 如果新节点数量过多, 防止占用过多内存, 移除一半的最旧节点, 这里用栈实现, 每次移除栈底的元素
                        if (newNodes.size() > pNewNodes) {
                            int count = newNodes.size() / 2;
                            for (int i = 0; i < count; i++) {
                                newNodes.remove(0);
                            }
                        }
                        // 获取最新的节点, 用来发送find_node请求
                        NodeInfo latestNode = newNodes.pop();
                        // 获取最新节点的ip和端口
                        InetSocketAddress target = new InetSocketAddress(latestNode.getIp(), latestNode.getPort());
                        // 发送find_node请求
                        // latestNode.getId().getData()
                        // findNode(localDHTNode.getId(), target, randomTargetId(id));
                        byte[] nid = null;
                        if (latestNode.getId() != null) {
                            nid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),
                                    latestNode.getId().getData(CharsetName.IOS_8859_1));
                            findNode(localDHTNode.getId(), nid, target, randomTargetId(id));
                        } else {
                            findNode(localDHTNode.getId(), localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),
                                    target, randomTargetId(id));
                        }
                        // 将请求过的节点加入到旧节点列表中
                        byte[] ipData = latestNode.getIpData();
                        byte[] portData = latestNode.getPortData();
                        oldNodes.add(new String(
                                new byte[] { ipData[0], ipData[1], ipData[2], ipData[3], portData[0], portData[1] },
                                CharsetName.IOS_8859_1));

                        // 如果旧节点数量过多, 防止占用过多内存, 移除一半最旧的旧节点, 这里的oldNodes是用LinkedHashSet实现, 因此按顺序移除将先移除最旧的
                        if (oldNodes.size() > pOldNodes) {
                            int count = oldNodes.size() / 2;
                            for (Iterator<String> it = oldNodes.iterator(); it.hasNext() && count-- > 0; it.remove()) {
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                }
            }
        }, 0, 1, TimeUnit.SECONDS);

        // 每2毫秒对AnnouncePeerData 进行下载
        worker.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (Main.me.isShutdown) {
                        sleep(20000);
                        System.out.println("AnnouncePeerData waiting...........................");
                        return;
                    }
                    AnnouncePeerData querData = getAnnouncePeerData().poll();
                    if (querData != null) {

                        onAnnouncePeerListener.onAnnouncePeer(querData.getAddress(), querData.getInfo_hash(),
                                querData.getPort());
                    }
                } catch (Exception e) {
                    log.error("DHTServer:AnnouncePeerData-" + e.getMessage());
                    e.printStackTrace();
                }
            }
        }, 0, 10, TimeUnit.MILLISECONDS);
    }

    /**
     * 发送findNode请求
     * 
     * @param id       本地节点的id
     * @param nid      違裝本地節點NID
     * @param target   目标节点的地址
     * @param targetId 目标节点的id
     */
    public void findNode(String id, byte[] nid, InetSocketAddress target, String targetId) {
        try {
            // 发送请求时候需要带一个transactionId, 这里生成两个字节的随机字符
            ByteBuffer bb = ByteBuffer.allocate(2).putShort((short) new Random().nextInt(Short.MAX_VALUE));
            bb.flip();
            String transactionId = new String(bb.array(), CharsetName.IOS_8859_1);

            // 请求数据放入到队列中等待异步发送
            nioHelper.write(id,
                    new NIOHelper.WriteData(DHTHelper.getFindNodeData(transactionId, nid, targetId), target));
        } catch (Exception e) {
            // throw new RuntimeException(e);
            e.printStackTrace();
        }
    }

    /**
     * 响应ping请求
     * 
     * @param id     本地节点的id
     * @param target 目标节点的地址
     * @param t      目标节点请求时发送过来的transactionId
     */
    public void responsePing(String id, InetSocketAddress target, String t) {
        // 响应数据放入到队列中等待异步发送
        nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getPingResponseData(t, id), target));

    }

    /**
     * 响应find_node请求
     * 
     * @param id     本地节点的id
     * @param target 目标节点的地址
     * @param t      目标节点请求时发送过来的transactionId
     * @param nodes  回复给目标节点的最近的八个节点信息
     */
    public void responseFindNode(String id, InetSocketAddress target, String t, List<NodeInfo> nodes) {
        // 响应数据放入到队列中等待异步发送
        nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getFindNodeResponseData(t, id, nodes), target));
    }

    /**
     * 响应get_peers请求
     * 
     * @param id     本地节点的id
     * @param target 目标节点的地址
     * @param t      目标节点请求时发送过来的transactionId
     * @param nodes  回复给目标节点的最近的八个节点信息
     */
    public void responseGetPeers(String id, byte[] bid, InetSocketAddress target, String t, List<NodeInfo> nodes) {
        // 响应数据放入到队列中等待异步发送
        // if(isAllowedIp(target.getHostString()))
        nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getGetPeersResponseData(t, id, bid, nodes), target));

    }

    /**
     * 响应announce_peer请求
     * 
     * @param id     本地节点的id
     * @param target 目标节点的地址
     * @param t      目标节点请求时发送过来的transactionId
     */
    public void responseAnnouncePeer(String id, byte[] bid, InetSocketAddress target, String t) {
        // 响应数据放入到队列中等待异步发送
        // if(isAllowedIp(target.getHostString()))
        nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getAnnouncePeerResponseData(t, id, bid), target));
    }

    /**
     * 打印得到的磁力链接
     * 
     * @param infoHash
     * @param source   获得磁力链接的来源 : 其他节点的get_peers和announce_peer请求
     */
    @SuppressWarnings("deprecation")
    private static void printMagnet(String infoHash, String source) {
        StringBuilder magnet = new StringBuilder(new Date().toLocaleString()).append(" magnet:?xt=urn:btih:");

        // infoHash的长度为20字节, 每个字节的值转为16进制后就是磁力链接
        for (char c : infoHash.toCharArray()) {
            String hs = Integer.toHexString(c);
            // 如果转为16进制后的长度不足2位则第一位补0
            if (hs.length() == 1) {
                magnet.append(0);
            }
            magnet.append(hs);
        }
        System.out.println(source + " " + magnet);
    }

    /**
     * 根据一个节点id随机获取一个目标节点id, 如果每次都只根据自身的节点id进行find_node操作很容易就会遍历完所有的节点导致无节点可遍历,
     * 因此这里根据节点的距离按照一定概率生成一个随机的目标节点id, 距离越远的目标节点id生成的概率越小
     * 
     * @param id 本地节点的id
     * @return
     */
    private static String randomTargetId(String id) {
        int i = ThreadLocalRandom.current().nextInt();
        // 有百分之一的概率产生一个基本完全随机的节点id
        if (i % 100 == 0) {
            return (char) (i % 256) + UUID.randomUUID().toString().substring(0, 19);
        }

        int i2 = ThreadLocalRandom.current().nextInt();
        // 十六分之一的概率修改原本节点id的最后一个字节, 修改的字节越靠后, 被其他节点保存的概率越大
        if ((i & 0B1111) == (i2 & 0B1111)) {
            char[] cs = id.toCharArray();
            cs[cs.length - 1] = (char) (i & 0B011111111);
            return new String(cs);
        }
        return id;
    }

    /**
     * getNeighbor
     * 
     * @param local_info_hash 本地节点的id
     * @param info_hash       目標節點ID
     */
    private byte[] getNeighbor(byte[] local_info_hash, byte[] info_hash) {
        byte[] bytes = new byte[20];
        System.arraycopy(info_hash, 0, bytes, 0, 10);
        System.arraycopy(local_info_hash, 10, bytes, 10, 10);
        return bytes;
    }

    private boolean isAllowedIp(String ip) {
        if (ls == null || allowedIp == null)
            return true;
        if (ls != null && allowedIp != null && !allowedIp.containsKey(ls.getCountry(ip).getCode())) {
            return false;
        }
        return true;
    }

    public static byte[] createRandomNodeId() {
        Random random = new Random();
        byte[] r = new byte[20];
        random.nextBytes(r);
        return r;
    }

    public void setOnGetPeersListener(OnGetPeersListener onGetPeersListener) {
        this.onGetPeersListener = onGetPeersListener;
    }

    public void setOnAnnouncePeerListener(OnAnnouncePeerListener onAnnouncePeerListener) {
        this.onAnnouncePeerListener = onAnnouncePeerListener;
    }

    public void stopAll() {
        this.interrupt();
//        if (channel != null)
//            channel.close().awaitUninterruptibly();
//        if (b != null)
//            b.releaseExternalResources();

        nioHelper.stopAll();
        if (worker != null) {
            worker.shutdown();
            System.out.println("worker.shutdown();........................");
            try {
                worker.awaitTermination(10, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                worker.shutdownNow();
                e.printStackTrace();
                System.out.println("worker.shutdownNow();........................");
            }
        }
    }

    public Map<String, String> getAllowedIp() {
        return allowedIp;
    }

    public void setAllowedIp(Map<String, String> allowedIp) {
        this.allowedIp = allowedIp;
    }

    public Queue<AnnouncePeerData> getAnnouncePeerData() {
        return announcePeerData;
    }
}

 

package com.cilij.dhtcrawler.server;

 

import java.io.IOException;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Date;

import java.util.HashMap;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Queue;

import java.util.Random;

import java.util.Set;

import java.util.Stack;

import java.util.UUID;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.TimeUnit;

 

importorg.apache.commons.lang.StringUtils;

 

import com.cilij.dhtcrawler.bencoding.BencodeMap;

import com.cilij.dhtcrawler.bencoding.BencodeString;

import com.cilij.dhtcrawler.bencoding.CharsetName;

import com.cilij.dhtcrawler.common.DHTHelper;

import com.cilij.dhtcrawler.common.LocalDHTNode;

import com.cilij.dhtcrawler.common.NIOHelper;

import com.cilij.dhtcrawler.common.NIOHelper.AnnouncePeerData;

import com.cilij.dhtcrawler.listener.OnAnnouncePeerListener;

import com.cilij.dhtcrawler.listener.OnGetPeersListener;

import com.cilij.dhtcrawler.main.Main;

import com.cilij.dhtcrawler.nodes.NodeId;

import com.cilij.dhtcrawler.nodes.NodeInfo;

import com.cilij.dhtcrawler.routingtable.RoutingList;

importcom.cilij.dhtcrawler.task.InWarehousTask;

import com.cilij.es.util.PropertiesUtil;

import com.jfinal.kit.Prop;

import com.jfinal.kit.PropKit;

import com.maxmind.geoip.LookupService;

 

import lombok.extern.slf4j.Slf4j;

 

/**

* 每个DHTServer对应多个本地DHT节点, 每个本地DHT节点监听一个端口, 每个DHTServer对象都有一个工作线程worker,

* 这个线程负责当前DHTServer对象维护的所有节点的数据读写及逻辑处理, 这些操作都是非阻塞的,

* 因此在多核处理器上创建和处理器数量相同的DHTServer对象就能最大限度利用CPU资源 每个DHTServer之间都是相互独立的,

* 不能存在数据共享和争用, 由于每个DHTServer内部的所有操作都是单线程, 所以所有操作都不需要进行线程同步, 最大限度消除同步控制的开销

*

* @authordgqjava

*

*/

@Slf4j

publicclass DHTServer extends Thread {

 

// private volatile boolean stop = false;

// private Channel channel;

// private ConnectionlessBootstrap b;

 

/**

* 自动重新加入DHT网络 timer

*

*/

// private Timer autoRejoinDHTTimer;

private LookupService ls = null;

private Map<String, String> allowedIp = null;

private OnGetPeersListener onGetPeersListener = null;

private OnAnnouncePeerListener onAnnouncePeerListener = null;

// DHT根节点, 这是几个长期稳定的公用节点, 通过这几个节点查找其他节点来初始化路由表

privatestaticfinal NodeInfo[] ROOT_NODES;

// 用于存储待处理的AnnouncePeerData的数据

privatefinal Queue<AnnouncePeerData> announcePeerData = new LinkedList<>();

// 这里面是当前DHTServer对象管理的本地的DHT节点

privatefinal List<LocalDHTNode> localDHTNodes = new ArrayList<>();

// 这个线程负责处理所有的操作, 包括 :

// NIO数据读写,

// 定时处理外部节点发送过来的请求数据并响应,

// 定时发起查找find_node请求来更新本地路由表及让更多的其他节点认识我们

private ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

// private ConcurrentHashMap<String, ScheduledFuture<?>> taskMap = new

// ConcurrentHashMap<>(4);

 

// 配置文件

Prop prop = PropKit.use("crawler.properties");

finalintpNewNodes = prop.getInt("main.dhtserver.max.newnodes");

finalintpOldNodes = prop.getInt("main.dhtserver.max.oldnodes");

privatefinal Map<String, LocalDHTNode> id2LocalDHTNode = new HashMap<>(); // 本地的nodeId和本地DHT节点的映射表

privatefinal NIOHelper nioHelper = new NIOHelper(worker); // NIOHelper类里封装了所有NIO操作,

// 这样在DHTServer里只需要负责处理其他节点的请求数据和发送find_node等业务处理的逻辑而不需要关注NIO相关的所有代码

static {

try {

ROOT_NODES = new NodeInfo[] { // 初始化DHT根节点

new NodeInfo(InetAddress.getByName("router.utorrent.com").getHostAddress(), 6881, null),

new NodeInfo(InetAddress.getByName("dht.transmissionbt.com").getHostAddress(), 6881, null),

new NodeInfo(InetAddress.getByName("router.bittorrent.com").getHostAddress(), 6881, null) };

} catch (Exception e) {

log.info("ERROR:ROOT_NODES");

thrownew RuntimeException(e);

}

}

 

/**

* 创建一个DHTServer对象并监听到多个端口, 每个端口关联一个唯一的nodeId

*

* @paramports 端口列表

* @param ids nodeId列表

*/

public DHTServer(Integer ports, String ids) {

try {

Map<Object, Object> esmap = PropertiesUtil.readMulProperties("/application.properties");

String p = esmap.get("dbconf_path").toString().concat("/dbConf/GeoIP.dat");

 

ls = new LookupService(p, LookupService.GEOIP_MEMORY_CACHE);

 

// 监听指定的端口, 并初始化数据

// for (int i = 0; i < ids.size(); i++) {

// String id = ids.get(i);

// int port = ports.get(i);

 

// 为当前节点创建一个路由表, 这里的192.168.0.1可以是任意一个合法的ip地址, 只是为了NodeInfo的构造方法不报NPE

RoutingList routingList = new RoutingList(new NodeInfo("192.168.0.1", ports, ids));

 

// 所有未请求或者短时间内未重复请求的节点放到这个栈空间内,

// 最新得到的节点信息放到栈顶, 优先向最新的节点发送find_node请求, 因为最新的节点是活动的概率最大,

// 如果栈内节点数超过10000, 则将他减半防止内存占用过多, 如果栈内节点数为0, 则将根节点重新加入栈, 开始新的一轮查找,

// 这样确保我们的节点一直在不停地 发送find_node来认识更多的其他节点

Stack<NodeInfo> newNodes = new Stack<>();

for (NodeInfo nodeInfo : ROOT_NODES) {

newNodes.push(nodeInfo);

}

 

// 我们本地的DHT节点

LocalDHTNode dhtNode = new LocalDHTNode(ids, newNodes, routingList);

 

// 添加一个端口监听

nioHelper.bind(ports, ids);

// 添加id和本地节点映射关系

id2LocalDHTNode.put(ids, dhtNode);

 

// 添加新的本地节点到本地节点列表

localDHTNodes.add(dhtNode);

 

// }

} catch (IOException e) {

e.printStackTrace();

} catch (RuntimeException e) {

e.printStackTrace();

}

 

// 每隔10毫秒通过nioHelper对象从队列中读取要处理的数据, 这些数据是外部节点请求或者回复我们的数据

worker.scheduleAtFixedRate(new Runnable() {

@Override

publicvoid run() {

try {

if (Main.me.isShutdown) {

sleep(20000);

System.out.println("nioHelper waiting........................");

return;

}

 

// 每隔10毫秒都会把队列里要处理的数据耗尽, 由于所有操作都是非阻塞且单线程的, 所以这些操作都会非常快速的完成, 不会影响其他功能

// while (!nioHelper.getReadDataQueue().isEmpty()) {

if (!nioHelper.getReadDataQueue().isEmpty()) {

try {

// 获取待处理的数据

NIOHelper.ReadData readData = nioHelper.getReadDataQueue().poll();

InetSocketAddress remoteAddress = (InetSocketAddress) readData.getRemoteAddress();

LocalDHTNode localDHTNode = id2LocalDHTNode.get(readData.getId());

Stack<NodeInfo> newNodes = localDHTNode.getNewNodes();

Set<String> oldNodes = localDHTNode.getOldNodes();

RoutingList routingList = localDHTNode.getRoutingList();

// 将收到的数据转化为B编码字典对象

if (readData.getData() == null)

return;

BencodeMap response = BencodeMap

.getMap(new String(readData.getData(), CharsetName.IOS_8859_1), 0);

// 当y为r时为其他节点对我们请求的回复, y为q时为其他节点对我们的请求, y为e时表示错误, 我们这里只处理请求和回复两种数据

if (response == null)

return;

byte[] b = response.get(new BencodeString("y")).getData();

if (b == null) {

System.out.println("ERRCODE:008:response2 is null~~~~~~~~~~~~~~~~~~~~~~~~~");

return;

}

String y = new String(b, CharsetName.IOS_8859_1);

if (y.equals("r")) {

// 获取数据来源方的nodeId

String remoteId = new String(((BencodeMap) (response.get(new BencodeString("r"))))

.get(new BencodeString("id")).getData(), CharsetName.IOS_8859_1);

 

// 将请求发送方的信息保存到路由表, 因为所有对我们发送请求或者响应请求的节点肯定都是活跃节点, 路由表中的桶使用LUR算法实现, 直接替换掉最旧的节点

routingList.addNode(new NodeInfo(remoteAddress.getAddress().getHostAddress(),

remoteAddress.getPort(), remoteId));

 

// 获取响应的的节点列表并解析节点信息, 每个节点信息长度为26, 包括20字节的节点id, 4字节的ip, 2字节的端口,

// 由于我们只发送find_node请求, 因此所有响应类型的数据都是对find_node的响应

if (new BencodeString("nodes") != null) {

byte[] nodes = ((BencodeMap) (response.get(new BencodeString("r"))))

.get(new BencodeString("nodes")).getData();

for (inti = 0; i < nodes.length; i += 26) {

// 获取节点id, 端口和ip

String nodeId = new String(Arrays.copyOfRange(nodes, i, i + 20),

CharsetName.IOS_8859_1);

byte[] ipPort = Arrays.copyOfRange(nodes, i + 20, i + 26);

String ip = (ipPort[0] & 0xFF) + "." + (ipPort[1] & 0xFF) + "."

+ (ipPort[2] & 0xFF) + "." + (ipPort[3] & 0xFF);

intport = ByteBuffer.wrap(newbyte[] { 0, 0, ipPort[4], ipPort[5] }).getInt();

 

// 判断得到的节点信息是否短期内已经请求过, 这里简单的用ip+端口来判断, 如果没有请求过则加入到新节点栈中

booleanisContain = oldNodes

.contains(new String(ipPort, CharsetName.IOS_8859_1));

if (!isContain) {

// byte[] nid = new byte[20];

// System.arraycopy(nodes, i, nid, 0, 20);

newNodes.push(new NodeInfo(ip, port, nodeId));

}

}

}

return;

}

 

// y为q时的数据是其他节点对我们的请求, 需要对其他节点的请求进行响应

if (y.equals("q")) {

// 获取数据来源方的nodeId

String remoteId = new String(((BencodeMap) (response.get(new BencodeString("a"))))

.get(new BencodeString("id")).getData(), CharsetName.IOS_8859_1);

 

// 将请求发送方的信息保存到路由表, 因为所有对我们发送请求或者响应请求的节点肯定都是活跃节点, 路由表中的桶使用LUR算法实现, 直接替换掉最旧的节点

routingList.addNode(new NodeInfo(remoteAddress.getAddress().getHostAddress(),

remoteAddress.getPort(), remoteId));

 

// 获取数据来源方的transactionId, 对请求进行响应时必须带上请求中的transactionId, 这样对方才知道这个响应针对的是哪个请求

String t = new String(response.get(new BencodeString("t")).getData(),

CharsetName.IOS_8859_1);

 

// 获取请求类型, 请求类型一共四种 : ping, find_node, get_peers, announce_peer

String q = new String(response.get(new BencodeString("q")).getData(),

CharsetName.IOS_8859_1);

 

// 处理ping请求, 这个是对方检查我们节点是否存活

if (q.equals("ping")) {

responsePing(readData.getId(), remoteAddress, t);

return;

}

 

// 处理find_node请求, 这个是对方向我们节点查找距离目标节点最近的8个节点

if (q.equals("find_node")) {

// 获取要查找的目标节点id

String target = new String(

((BencodeMap) (response.get(new BencodeString("a"))))

.get(new BencodeString("target")).getData(),

CharsetName.IOS_8859_1);

// 获取距离目标节点id最近的八个节点信息并响应

responseFindNode(readData.getId(), remoteAddress, t, routingList

.getNearestNodes(new NodeId(target.getBytes(CharsetName.IOS_8859_1))));

return;

}

 

// 处理get_peers请求, 这个是对方向我们查询可以下载到指定磁力链接数据的节点信息, 如果我们没有可下载的节点则提供最近的8个节点返回,

// 因为我们不保存peer的信息, 所以我们这里每次都只返回最近的八个节点

if (q.equals("get_peers")) {

// 获取infoHash, 这个就能转化为磁力链接, get_peers中得到的infoHash有可能是无效的

String infoHash = new String(

((BencodeMap) (response.get(new BencodeString("a"))))

.get(newBencodeString("info_hash")).getData(),

CharsetName.IOS_8859_1);

 

// 转化为磁力链接并打印

// printMagnet(infoHash, "get_peers");

// 获取距离infoHash最近的八个节点信息并响应

byte[] tid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),

readData.getId().getBytes(CharsetName.IOS_8859_1));

responseGetPeers(localDHTNode.getId(), tid, remoteAddress, t, routingList

.getNearestNodes(new NodeId(infoHash.getBytes(CharsetName.IOS_8859_1))));

if (onGetPeersListener != null) {

announcePeerData.add(new AnnouncePeerData(remoteAddress,

((BencodeMap) (response.get(new BencodeString("a"))))

.get(new BencodeString("info_hash")).getData(),

remoteAddress.getPort()));

// onGetPeersListener.onGetPeers(remoteAddress,

// ((BencodeMap) (response.get(new BencodeString("a"))))

// .get(new BencodeString("info_hash")).getData());

}

return;

}

 

// 处理announce_peer请求, 这个是对方告诉我们他已经找到了文件的下载地址, 我们这里只获取他正在下载的磁力链接,

// 这里获取的磁力链接基本上是有效的

if (q.equals("announce_peer")) {

// 获取infoHash

// String infoHash = new String(((BencodeMap) (response.get(new BencodeString("a"))))

// .get(new BencodeString("info_hash")).getData(), CharsetName.IOS_8859_1);

 

// 转化为磁力链接并打印

// printMagnet(infoHash, "announce_peer");

// 响应对方的announce_peer请求

byte[] tid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),

readData.getId().getBytes(CharsetName.IOS_8859_1));

responseAnnouncePeer(readData.getId(), tid, remoteAddress, t);

if (onAnnouncePeerListener != null) {

announcePeerData.add(new AnnouncePeerData(remoteAddress,

((BencodeMap) (response.get(new BencodeString("a"))))

.get(new BencodeString("info_hash")).getData(),

remoteAddress.getPort()));

// onAnnouncePeerListener.onAnnouncePeer(remoteAddress,

// ((BencodeMap) (response.get(new BencodeString("a"))))

// .get(new BencodeString("info_hash")).getData(),

// remoteAddress.getPort());

}

return;

}

}

 

} catch (Exception ex) {

ex.printStackTrace();

log.error(ex.getMessage());

}

}

} catch (Exception e) {

e.printStackTrace();

log.error(e.getMessage());

}

}

}, 0, 1, TimeUnit.MILLISECONDS);

 

// 每秒对当前DHTServer维护的所有本地节点执行一次find_node操作, 让更多的节点认识我们, 这样才能有更多的节点发送磁力链接到我们节点

worker.scheduleAtFixedRate(new Runnable() {

@Override

publicvoid run() {

try {

if (Main.me.isShutdown) {

sleep(20000);

// System.out.println("find_node waiting...........................");

return;

}

 

// 遍历本地节点

for (LocalDHTNode localDHTNode : localDHTNodes) {

// LocalDHTNode localDHTNode = localDHTNodes.get(localDHTNodesIng);

 

Stack<NodeInfo> newNodes = localDHTNode.getNewNodes();

// System.out.println("遍历本地节点"+newNodes.size());

Set<String> oldNodes = localDHTNode.getOldNodes();

String id = localDHTNode.getId();

// 如果新节点列表为空, 则重新加入根节点, 重新开始查找

if (newNodes.isEmpty()) {

// System.out.println("CODE:001:如果新节点列表为空, 则重新加入根节点, 重新开始查找");

newNodes.push(ROOT_NODES[0]);

newNodes.push(ROOT_NODES[1]);

newNodes.push(ROOT_NODES[2]);

log.info("CODE:001:如果新节点列表为空, 则重新加入根节点, 重新开始查找");

}

// 如果新节点数量过多, 防止占用过多内存, 移除一半的最旧节点, 这里用栈实现, 每次移除栈底的元素

if (newNodes.size() > pNewNodes) {

intcount = newNodes.size() / 2;

for (inti = 0; i < count; i++) {

newNodes.remove(0);

}

}

// 获取最新的节点, 用来发送find_node请求

NodeInfo latestNode = newNodes.pop();

// 获取最新节点的ip和端口

InetSocketAddress target = new InetSocketAddress(latestNode.getIp(), latestNode.getPort());

// 发送find_node请求

// latestNode.getId().getData()

// findNode(localDHTNode.getId(), target, randomTargetId(id));

byte[] nid = null;

if (latestNode.getId() != null) {

nid = getNeighbor(localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),

latestNode.getId().getData(CharsetName.IOS_8859_1));

findNode(localDHTNode.getId(), nid, target, randomTargetId(id));

} else {

findNode(localDHTNode.getId(), localDHTNode.getId().getBytes(CharsetName.IOS_8859_1),

target, randomTargetId(id));

}

// 将请求过的节点加入到旧节点列表中

byte[] ipData = latestNode.getIpData();

byte[] portData = latestNode.getPortData();

oldNodes.add(new String(

newbyte[] { ipData[0], ipData[1], ipData[2], ipData[3], portData[0], portData[1] },

CharsetName.IOS_8859_1));

 

// 如果旧节点数量过多, 防止占用过多内存, 移除一半最旧的旧节点, 这里的oldNodes是用LinkedHashSet实现, 因此按顺序移除将先移除最旧的

if (oldNodes.size() > pOldNodes) {

intcount = oldNodes.size() / 2;

for (Iterator<String> it = oldNodes.iterator(); it.hasNext() && count-- > 0; it.remove()) {

}

}

}

} catch (Exception e) {

log.error(e.getMessage());

e.printStackTrace();

}

}

}, 0, 1, TimeUnit.SECONDS);

 

// 每2毫秒对AnnouncePeerData 进行下载

worker.scheduleAtFixedRate(new Runnable() {

@Override

publicvoid run() {

try {

if (Main.me.isShutdown) {

sleep(20000);

System.out.println("AnnouncePeerData waiting...........................");

return;

}

AnnouncePeerData querData = getAnnouncePeerData().poll();

if (querData != null) {

 

onAnnouncePeerListener.onAnnouncePeer(querData.getAddress(), querData.getInfo_hash(),

querData.getPort());

}

} catch (Exception e) {

log.error("DHTServer:AnnouncePeerData-" + e.getMessage());

e.printStackTrace();

}

}

}, 0, 10, TimeUnit.MILLISECONDS);

}

 

/**

* 发送findNode请求

*

* @paramid 本地节点的id

* @param nid 違裝本地節點NID

* @paramtarget 目标节点的地址

* @paramtargetId 目标节点的id

*/

publicvoid findNode(String id, byte[] nid, InetSocketAddress target, String targetId) {

try {

// 发送请求时候需要带一个transactionId, 这里生成两个字节的随机字符

ByteBuffer bb = ByteBuffer.allocate(2).putShort((short) new Random().nextInt(Short.MAX_VALUE));

bb.flip();

String transactionId = new String(bb.array(), CharsetName.IOS_8859_1);

 

// 请求数据放入到队列中等待异步发送

nioHelper.write(id,

new NIOHelper.WriteData(DHTHelper.getFindNodeData(transactionId, nid, targetId), target));

} catch (Exception e) {

// throw new RuntimeException(e);

e.printStackTrace();

}

}

 

/**

* 响应ping请求

*

* @paramid 本地节点的id

* @paramtarget 目标节点的地址

* @paramt 目标节点请求时发送过来的transactionId

*/

publicvoid responsePing(String id, InetSocketAddress target, String t) {

// 响应数据放入到队列中等待异步发送

nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getPingResponseData(t, id), target));

 

}

 

/**

* 响应find_node请求

*

* @paramid 本地节点的id

* @paramtarget 目标节点的地址

* @paramt 目标节点请求时发送过来的transactionId

* @paramnodes 回复给目标节点的最近的八个节点信息

*/

publicvoid responseFindNode(String id, InetSocketAddress target, String t, List<NodeInfo> nodes) {

// 响应数据放入到队列中等待异步发送

nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getFindNodeResponseData(t, id, nodes), target));

}

 

/**

* 响应get_peers请求

*

* @paramid 本地节点的id

* @paramtarget 目标节点的地址

* @paramt 目标节点请求时发送过来的transactionId

* @paramnodes 回复给目标节点的最近的八个节点信息

*/

publicvoid responseGetPeers(String id, byte[] bid, InetSocketAddress target, String t, List<NodeInfo> nodes) {

// 响应数据放入到队列中等待异步发送

// if(isAllowedIp(target.getHostString()))

nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getGetPeersResponseData(t, id, bid, nodes), target));

 

}

 

/**

* 响应announce_peer请求

*

* @paramid 本地节点的id

* @paramtarget 目标节点的地址

* @paramt 目标节点请求时发送过来的transactionId

*/

publicvoid responseAnnouncePeer(String id, byte[] bid, InetSocketAddress target, String t) {

// 响应数据放入到队列中等待异步发送

// if(isAllowedIp(target.getHostString()))

nioHelper.write(id, new NIOHelper.WriteData(DHTHelper.getAnnouncePeerResponseData(t, id, bid), target));

}

 

/**

* 打印得到的磁力链接

*

* @paraminfoHash

* @paramsource 获得磁力链接的来源 : 其他节点的get_peers和announce_peer请求

*/

@SuppressWarnings("deprecation")

privatestaticvoidprintMagnet(String infoHash, String source) {

StringBuilder magnet = new StringBuilder(new Date().toLocaleString()).append(" magnet:?xt=urn:btih:");

 

// infoHash的长度为20字节, 每个字节的值转为16进制后就是磁力链接

for (charc : infoHash.toCharArray()) {

String hs = Integer.toHexString(c);

// 如果转为16进制后的长度不足2位则第一位补0

if (hs.length() == 1) {

magnet.append(0);

}

magnet.append(hs);

}

System.out.println(source + " " + magnet);

}

 

/**

* 根据一个节点id随机获取一个目标节点id, 如果每次都只根据自身的节点id进行find_node操作很容易就会遍历完所有的节点导致无节点可遍历,

* 因此这里根据节点的距离按照一定概率生成一个随机的目标节点id, 距离越远的目标节点id生成的概率越小

*

* @paramid 本地节点的id

* @return

*/

privatestatic String randomTargetId(String id) {

inti = ThreadLocalRandom.current().nextInt();

// 有百分之一的概率产生一个基本完全随机的节点id

if (i % 100 == 0) {

return (char) (i % 256) + UUID.randomUUID().toString().substring(0, 19);

}

 

inti2 = ThreadLocalRandom.current().nextInt();

// 十六分之一的概率修改原本节点id的最后一个字节, 修改的字节越靠后, 被其他节点保存的概率越大

if ((i & 0B1111) == (i2 & 0B1111)) {

char[] cs = id.toCharArray();

cs[cs.length - 1] = (char) (i & 0B011111111);

returnnew String(cs);

}

returnid;

}

 

/**

* getNeighbor

*

* @paramlocal_info_hash 本地节点的id

* @paraminfo_hash 目標節點ID

*/

privatebyte[] getNeighbor(byte[] local_info_hash, byte[] info_hash) {

byte[] bytes = newbyte[20];

System.arraycopy(info_hash, 0, bytes, 0, 10);

System.arraycopy(local_info_hash, 10, bytes, 10, 10);

returnbytes;

}

 

privatebooleanisAllowedIp(String ip) {

if (ls == null || allowedIp == null)

returntrue;

if (ls != null && allowedIp != null && !allowedIp.containsKey(ls.getCountry(ip).getCode())) {

returnfalse;

}

returntrue;

}

 

publicstaticbyte[] createRandomNodeId() {

Random random = new Random();

byte[] r = newbyte[20];

random.nextBytes(r);

returnr;

}

 

publicvoid setOnGetPeersListener(OnGetPeersListener onGetPeersListener) {

this.onGetPeersListener = onGetPeersListener;

}

 

publicvoid setOnAnnouncePeerListener(OnAnnouncePeerListener onAnnouncePeerListener) {

this.onAnnouncePeerListener = onAnnouncePeerListener;

}

 

publicvoid stopAll() {

this.interrupt();

// if (channel != null)

// channel.close().awaitUninterruptibly();

// if (b != null)

// b.releaseExternalResources();

 

nioHelper.stopAll();

if (worker != null) {

worker.shutdown();

System.out.println("worker.shutdown();........................");

try {

worker.awaitTermination(10, TimeUnit.MINUTES);

} catch (InterruptedException e) {

worker.shutdownNow();

e.printStackTrace();

System.out.println("worker.shutdownNow();........................");

}

}

}

 

public Map<String, String> getAllowedIp() {

returnallowedIp;

}

 

publicvoid setAllowedIp(Map<String, String> allowedIp) {

this.allowedIp = allowedIp;

}

 

public Queue<AnnouncePeerData> getAnnouncePeerData() {

returnannouncePeerData;

}

}

 

标签:String,get,DHTServer,id,new,节点,请求
From: https://www.cnblogs.com/okeyl/p/17263354.html

相关文章