Mina網(wǎng)絡(luò)通信框架(續(xù))

在看到本文之前,如果讀者沒看過筆者的前文Mina網(wǎng)絡(luò)通信框架 ,請先翻閱。

在本節(jié)中,我將用 Mina 框架,同時(shí)實(shí)現(xiàn)服務(wù)器和客戶端,實(shí)現(xiàn)客戶端與客戶端之間的 IM 通信,使用 Java JDBC 接口連入 MySQL 數(shù)據(jù)庫,實(shí)現(xiàn)一個具有登錄功能的簡易 IM 聊天工具。

首先,我們來看看效果圖:

使用 Navicat 可視化操作工具創(chuàng)建并連接數(shù)據(jù)庫


Paste_Image.png

啟動 Mina 服務(wù)器


Paste_Image.png

啟動登錄界面


Paste_Image.png

注冊失敗演示


Paste_Image.png

注冊成功演示


Paste_Image.png
Paste_Image.png

觀察數(shù)據(jù)庫,發(fā)現(xiàn)添加了新用戶


Paste_Image.png

登錄后的聊天窗口


Paste_Image.png

再登錄一個用戶


Paste_Image.png

發(fā)送消息演示

Paste_Image.png

下面,筆者貼出幾個核心代碼部分:

數(shù)據(jù)庫登錄處理邏輯

package database;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.mina.core.session.IoSession;

import util.MessageUtil;
import util.SessionUtil;
import common.Common;

/**
 * 管理客戶端的數(shù)據(jù)庫
 */
public class ClientJDBC {

    public ClientJDBC() {
        try {
            // 加載JDBC Driver
            Class.forName("com.mysql.jdbc.Driver");

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查詢是否已存在此用戶
     * 2、注冊,添加新數(shù)據(jù)
     */
    public void addNewData(String name, String password, IoSession loginSession) {
        try {
            // 建立連接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 創(chuàng)建Statement
            Statement statement = connection.createStatement();
            // 查詢是否已存在此用戶
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果不存在此用戶
            if (!set.next()) {
                // 添加數(shù)據(jù)
                PreparedStatement preparedStatement = connection
                        .prepareStatement("insert into client values(null,?,?)");
                preparedStatement.setString(1, name);
                preparedStatement.setString(2, password);
                preparedStatement.executeUpdate();
                // 提示用戶注冊成功
                loginSession.write(Common.CANREG);
            } else {
                // 提示用戶已注冊
                loginSession.write(Common.CANTRE);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查詢帳號密碼是否匹配
     * 2、查詢是否用戶已登錄
     * 3、登錄
     */
    public boolean login(String name, String password, IoSession userSession) {
        try {
            // 建立連接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 創(chuàng)建Statement
            Statement statement = connection.createStatement();

            // 查詢是否已存在此用戶
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果存在此用戶
            if (set.next()) {
                // 密碼匹配成功
                if (set.getString("password").equals(password)) {
                    // 如果用戶沒登錄
                    if (SessionUtil.getSessionFromName(name) == null) {
                        // 存儲用戶名
                        userSession.setAttribute("name", name);
                        // 登錄成功
                        String s = name;
                        String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
                        userSession.write(message);
                        // 發(fā)送在線列表給所有客戶端
                        SessionUtil.sendClientListToAll();
                    } else {
                        // 提示用戶已登錄
                        userSession.write(Common.IS_LOGIN);
                    }
                } else {
                    // 提示帳號和密碼不匹配
                    userSession.write(Common.ISNT_MATCH);
                }
            } else {
                // 提示帳號和密碼不匹配
                userSession.write(Common.ISNT_MATCH);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return true;
    }

}

服務(wù)器端Handler處理邏輯
注意,此處繼承的是StreamIoHandler,為了后續(xù)對傳輸文件進(jìn)行操作

package server;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;
import util.SessionUtil;
import common.Common;

/**
 * Mina Handler
 */
public class ServerHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getRemoteAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以換行符結(jié)束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getRemoteAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注冊信息
        if (result.startsWith(Common.REGISTER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.addNewData(array[0], array[1], session);
        }
        // 登錄信息
        else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.login(array[0], array[1], session);
        }
        // 登出信息
        else if (result.equals(Common.LOGOUT) && result.length() == 6) {
            // 關(guān)閉當(dāng)前session
            session.close();
            // 發(fā)送在線列表給所有客戶端
            SessionUtil.sendClientListToAll();
        }
        // 用戶之間發(fā)送消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // 分隔選中用戶名和要發(fā)送的消息
            String[] array = suffix.split(Common.SEPARATOR);
            // 向指定用戶發(fā)送消息
            if (session.getAttribute("name") != null)
                SessionUtil.sendToSelectedClient(
                        (String) session.getAttribute("name"), array[0],
                        array[1]);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getRemoteAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionClosed");
    }

    /**
     * 用戶創(chuàng)建的時(shí)候保留會話的ip和端口
     */
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println(session.getRemoteAddress() + "   "
                + "sessionCreated");
        // 提取客戶端ip和端口
        String s = session.getRemoteAddress() + "";
        String port = s.substring(s.indexOf(":") + 1);
        String ip = s.substring((s.indexOf("/") + 1),
                s.length() - 1 - port.length());
        // 存儲ip和端口
        if (session.getAttribute("ip") == null)
            session.setAttribute("ip", ip);
        if (session.getAttribute("port") == null)
            session.setAttribute("port", port);
    }

    /**
     * 會話空閑狀態(tài)
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getRemoteAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getRemoteAddress() + "   "
                + "processStreamIo");
        // 設(shè)定一個線程池
        // 參數(shù)說明:最少數(shù)量3,最大數(shù)量6 空閑時(shí)間 3秒
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
                TimeUnit.SECONDS,
                // 緩沖隊(duì)列為3
                new ArrayBlockingQueue<Runnable>(3),
                // 拋棄舊的任務(wù)
                new ThreadPoolExecutor.DiscardOldestPolicy());
        FileOutputStream fos = null;
        // 此處路徑如何動態(tài)設(shè)定。
        File receiveFile = new File("F:\\111.jks");

        try {
            fos = new FileOutputStream(receiveFile);
        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        } finally {
            try {
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        // 將線程放入線程池 當(dāng)連接很多時(shí)候可以通過線程池處理
        threadPool.execute(new IoStreamThreadWork(input, fos));
    }
}

客戶端Handler處理邏輯

package client;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.swing.DefaultListModel;
import javax.swing.JOptionPane;
import javax.swing.JTextArea;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;

import common.Common;

import frame.ChatFrame;
import frame.LoginFrame;

/**
 * Mina Handler
 */
// IoHandlerAdapter
public class ClientHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getLocalAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以換行符結(jié)束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getLocalAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注冊成功
        if (result.equals(Common.CANREG) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "注冊成功!");
        }
        // 用戶已存在注冊失敗
        else if (result.equals(Common.CANTRE) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用戶已存在!");
        }
        // 用戶已登錄,登錄失敗
        else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用戶已登錄!");
            session.close();// 關(guān)閉這個Session
        }
        // 帳號和密碼不匹配,登錄失敗
        else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "帳號和密碼不匹配!");
            session.close();// 關(guān)閉這個Session
        }
        // 登錄成功
        else if (result.startsWith(Common.LOGIN_SUCCESS)
                && result.length() > 10) {
            String suffix = result.substring(10);
            // 從Attribute中取出登錄窗口
            LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
            // 關(guān)閉登錄窗口
            frame.dispose();
            // 打開聊天窗口
            new ChatFrame(session).setTitle("用戶" + suffix + "的聊天窗口");
        }
        // 服務(wù)器發(fā)送在線列表消息
        else if (result.startsWith(Common.SERVER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            // 判斷空,因?yàn)橛衛(wèi)oginSession
            if (session.getAttribute("clientListModel") != null) {
                // 取出clientListModel
                DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
                        .getAttribute("clientListModel"));
                // 清空原視圖
                clientListModel.removeAllElements();
                // 重新加入在線用戶列表
                for (String clientName : array)
                    clientListModel.addElement(clientName);
            }
        }
        // 客戶發(fā)送過來的消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // if (session.getAttribute("receivedMessage") != null)
            JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
            jta.append(suffix + "\n");
            // 滾動到底端
            jta.setCaretPosition(jta.getText().length());
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "sessionClosed");
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out
                .println(session.getLocalAddress() + "   " + "sessionCreated");
    }

    /**
     * 會話空閑狀態(tài)
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getLocalAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out.println(session.getLocalAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getLocalAddress() + "   "
                + "processStreamIo");
        // 客戶端發(fā)送文件
        File sendFile = new File("e:\\MyKey.jks");
        FileInputStream fis = null;
        try {
            fis = new FileInputStream(sendFile);

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        // 放入線程讓其執(zhí)行
        // 客戶端一般都用一個線程實(shí)現(xiàn)即可 不用線程池
        new IoStreamThreadWork(fis, output).start();
        return;

    }
}

在 Mina 的使用過程中,應(yīng)注意以下問題:

【1】客戶端IoSession.setAttribute后,服務(wù)器IoSession.getAttribute為null,這兩個session雖然getId一樣,但不是同一個session

【2】使用TextLineCodecFactory,messageReceived方法默認(rèn)只能接收以換行符結(jié)束的信息

【3】使用getAttribute時(shí)應(yīng)注意判斷空指針:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))
否則不會報(bào)錯,但下面的程序段會終止執(zhí)行??稍趀xceptionCaught方法中catch到異常

【4】每個客戶端對應(yīng)一個ConnectFuture,每個ConnectFuture對應(yīng)一個IoSession

【5】可以巧用session.setAttribute("loginFrame", LoginFrame.this);來進(jìn)行窗口的傳遞,注意:如果在監(jiān)聽中直接傳this,要注意是不是LoginFrame.this

【6】mina不能動態(tài)修改acceptor的ProtocolCodecFilter和Handler

【7】傳輸文件可通過StreamIoHandler和MessageEncoder的方式

整個案例的源碼及相關(guān) jar 包在百度云中給出鏈接 Mina,讀者如要運(yùn)行項(xiàng)目,還需自行安裝 MySQL 創(chuàng)建數(shù)據(jù)庫。

謝謝您的關(guān)注和支持!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 會話(Session)跟蹤是Web程序中常用的技術(shù),用來跟蹤用戶的整個會話。常用的會話跟蹤技術(shù)是Cookie與Se...
    chinariver閱讀 5,786評論 1 49
  • 目錄Cookie機(jī)制什么是CookieCookie的不可跨域名性Unicode編碼:保存中文BASE64編碼:保存...
    Tomatoro閱讀 17,044評論 7 186
  • 有些事情的努力并不一定會很有用,但我不能否定它們
    杰star愛番石榴閱讀 176評論 0 1
  • 今天早上起來,我感到十分的輕松。心中想了一下,今天有什么任務(wù)呢?好像沒有什么任務(wù)。于是我就從床上爬了起來,...
    0328周宇同洲雨閱讀 257評論 2 1
  • 人生旅途的行囊中,或許我們早已裝滿了許多東西,但別忘記自己生命旅程中最重要的東西,生命的旅途不可能一帆風(fēng)順...
    懵懂情愫化詩雨閱讀 279評論 0 1

友情鏈接更多精彩內(nèi)容