在看到本文之前,如果讀者沒看過筆者的前文Mina網(wǎng)絡(luò)通信框架 ,請先翻閱。
在本節(jié)中,我將用 Mina 框架,同時(shí)實(shí)現(xiàn)服務(wù)器和客戶端,實(shí)現(xiàn)客戶端與客戶端之間的 IM 通信,使用 Java JDBC 接口連入 MySQL 數(shù)據(jù)庫,實(shí)現(xiàn)一個具有登錄功能的簡易 IM 聊天工具。
首先,我們來看看效果圖:
使用 Navicat 可視化操作工具創(chuàng)建并連接數(shù)據(jù)庫

啟動 Mina 服務(wù)器

啟動登錄界面

注冊失敗演示

注冊成功演示


觀察數(shù)據(jù)庫,發(fā)現(xiàn)添加了新用戶

登錄后的聊天窗口

再登錄一個用戶

發(fā)送消息演示

下面,筆者貼出幾個核心代碼部分:
數(shù)據(jù)庫登錄處理邏輯
package database;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.mina.core.session.IoSession;
import util.MessageUtil;
import util.SessionUtil;
import common.Common;
/**
* 管理客戶端的數(shù)據(jù)庫
*/
public class ClientJDBC {
public ClientJDBC() {
try {
// 加載JDBC Driver
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* <pre>
* 1、查詢是否已存在此用戶
* 2、注冊,添加新數(shù)據(jù)
*/
public void addNewData(String name, String password, IoSession loginSession) {
try {
// 建立連接
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/clientdb", "root", "");
// 創(chuàng)建Statement
Statement statement = connection.createStatement();
// 查詢是否已存在此用戶
String selectSQL = "select password from client where name=" + "'"
+ name + "'";
ResultSet set = statement.executeQuery(selectSQL);
// 如果不存在此用戶
if (!set.next()) {
// 添加數(shù)據(jù)
PreparedStatement preparedStatement = connection
.prepareStatement("insert into client values(null,?,?)");
preparedStatement.setString(1, name);
preparedStatement.setString(2, password);
preparedStatement.executeUpdate();
// 提示用戶注冊成功
loginSession.write(Common.CANREG);
} else {
// 提示用戶已注冊
loginSession.write(Common.CANTRE);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* <pre>
* 1、查詢帳號密碼是否匹配
* 2、查詢是否用戶已登錄
* 3、登錄
*/
public boolean login(String name, String password, IoSession userSession) {
try {
// 建立連接
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/clientdb", "root", "");
// 創(chuàng)建Statement
Statement statement = connection.createStatement();
// 查詢是否已存在此用戶
String selectSQL = "select password from client where name=" + "'"
+ name + "'";
ResultSet set = statement.executeQuery(selectSQL);
// 如果存在此用戶
if (set.next()) {
// 密碼匹配成功
if (set.getString("password").equals(password)) {
// 如果用戶沒登錄
if (SessionUtil.getSessionFromName(name) == null) {
// 存儲用戶名
userSession.setAttribute("name", name);
// 登錄成功
String s = name;
String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
userSession.write(message);
// 發(fā)送在線列表給所有客戶端
SessionUtil.sendClientListToAll();
} else {
// 提示用戶已登錄
userSession.write(Common.IS_LOGIN);
}
} else {
// 提示帳號和密碼不匹配
userSession.write(Common.ISNT_MATCH);
}
} else {
// 提示帳號和密碼不匹配
userSession.write(Common.ISNT_MATCH);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
return true;
}
}
服務(wù)器端Handler處理邏輯
注意,此處繼承的是StreamIoHandler,為了后續(xù)對傳輸文件進(jìn)行操作
package server;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import thread.IoStreamThreadWork;
import util.SessionUtil;
import common.Common;
/**
* Mina Handler
*/
public class ServerHandler extends StreamIoHandler {
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
System.out.println(session.getRemoteAddress() + " "
+ "exceptionCaught");
System.out.println(cause);
}
/**
* mina只能接收以換行符結(jié)束的信息
*/
@Override
public void messageReceived(IoSession session, Object message) {
System.out.println(session.getRemoteAddress() + " "
+ "messageReceived");
// 解析信息
String result = (String) message;
// 注冊信息
if (result.startsWith(Common.REGISTER) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
MinaServer.clientJDBC.addNewData(array[0], array[1], session);
}
// 登錄信息
else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
MinaServer.clientJDBC.login(array[0], array[1], session);
}
// 登出信息
else if (result.equals(Common.LOGOUT) && result.length() == 6) {
// 關(guān)閉當(dāng)前session
session.close();
// 發(fā)送在線列表給所有客戶端
SessionUtil.sendClientListToAll();
}
// 用戶之間發(fā)送消息
else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
String suffix = result.substring(10);
// 分隔選中用戶名和要發(fā)送的消息
String[] array = suffix.split(Common.SEPARATOR);
// 向指定用戶發(fā)送消息
if (session.getAttribute("name") != null)
SessionUtil.sendToSelectedClient(
(String) session.getAttribute("name"), array[0],
array[1]);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println(session.getRemoteAddress() + " " + "messageSent");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out
.println(session.getRemoteAddress() + " " + "sessionClosed");
}
/**
* 用戶創(chuàng)建的時(shí)候保留會話的ip和端口
*/
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println(session.getRemoteAddress() + " "
+ "sessionCreated");
// 提取客戶端ip和端口
String s = session.getRemoteAddress() + "";
String port = s.substring(s.indexOf(":") + 1);
String ip = s.substring((s.indexOf("/") + 1),
s.length() - 1 - port.length());
// 存儲ip和端口
if (session.getAttribute("ip") == null)
session.setAttribute("ip", ip);
if (session.getAttribute("port") == null)
session.setAttribute("port", port);
}
/**
* 會話空閑狀態(tài)
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
System.out.println(session.getRemoteAddress() + " " + "sessionIdle");
}
@Override
public void sessionOpened(IoSession session) {
System.out
.println(session.getRemoteAddress() + " " + "sessionOpened");
}
@Override
protected void processStreamIo(IoSession session, InputStream input,
OutputStream output) {
System.out.println(session.getRemoteAddress() + " "
+ "processStreamIo");
// 設(shè)定一個線程池
// 參數(shù)說明:最少數(shù)量3,最大數(shù)量6 空閑時(shí)間 3秒
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
TimeUnit.SECONDS,
// 緩沖隊(duì)列為3
new ArrayBlockingQueue<Runnable>(3),
// 拋棄舊的任務(wù)
new ThreadPoolExecutor.DiscardOldestPolicy());
FileOutputStream fos = null;
// 此處路徑如何動態(tài)設(shè)定。
File receiveFile = new File("F:\\111.jks");
try {
fos = new FileOutputStream(receiveFile);
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} finally {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 將線程放入線程池 當(dāng)連接很多時(shí)候可以通過線程池處理
threadPool.execute(new IoStreamThreadWork(input, fos));
}
}
客戶端Handler處理邏輯
package client;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.swing.DefaultListModel;
import javax.swing.JOptionPane;
import javax.swing.JTextArea;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import thread.IoStreamThreadWork;
import common.Common;
import frame.ChatFrame;
import frame.LoginFrame;
/**
* Mina Handler
*/
// IoHandlerAdapter
public class ClientHandler extends StreamIoHandler {
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
System.out.println(session.getLocalAddress() + " "
+ "exceptionCaught");
System.out.println(cause);
}
/**
* mina只能接收以換行符結(jié)束的信息
*/
@Override
public void messageReceived(IoSession session, Object message) {
System.out.println(session.getLocalAddress() + " "
+ "messageReceived");
// 解析信息
String result = (String) message;
// 注冊成功
if (result.equals(Common.CANREG) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "注冊成功!");
}
// 用戶已存在注冊失敗
else if (result.equals(Common.CANTRE) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "用戶已存在!");
}
// 用戶已登錄,登錄失敗
else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "用戶已登錄!");
session.close();// 關(guān)閉這個Session
}
// 帳號和密碼不匹配,登錄失敗
else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "帳號和密碼不匹配!");
session.close();// 關(guān)閉這個Session
}
// 登錄成功
else if (result.startsWith(Common.LOGIN_SUCCESS)
&& result.length() > 10) {
String suffix = result.substring(10);
// 從Attribute中取出登錄窗口
LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
// 關(guān)閉登錄窗口
frame.dispose();
// 打開聊天窗口
new ChatFrame(session).setTitle("用戶" + suffix + "的聊天窗口");
}
// 服務(wù)器發(fā)送在線列表消息
else if (result.startsWith(Common.SERVER) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
// 判斷空,因?yàn)橛衛(wèi)oginSession
if (session.getAttribute("clientListModel") != null) {
// 取出clientListModel
DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
.getAttribute("clientListModel"));
// 清空原視圖
clientListModel.removeAllElements();
// 重新加入在線用戶列表
for (String clientName : array)
clientListModel.addElement(clientName);
}
}
// 客戶發(fā)送過來的消息
else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
String suffix = result.substring(10);
// if (session.getAttribute("receivedMessage") != null)
JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
jta.append(suffix + "\n");
// 滾動到底端
jta.setCaretPosition(jta.getText().length());
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println(session.getLocalAddress() + " " + "messageSent");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println(session.getLocalAddress() + " " + "sessionClosed");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out
.println(session.getLocalAddress() + " " + "sessionCreated");
}
/**
* 會話空閑狀態(tài)
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
System.out.println(session.getLocalAddress() + " " + "sessionIdle");
}
@Override
public void sessionOpened(IoSession session) {
System.out.println(session.getLocalAddress() + " " + "sessionOpened");
}
@Override
protected void processStreamIo(IoSession session, InputStream input,
OutputStream output) {
System.out.println(session.getLocalAddress() + " "
+ "processStreamIo");
// 客戶端發(fā)送文件
File sendFile = new File("e:\\MyKey.jks");
FileInputStream fis = null;
try {
fis = new FileInputStream(sendFile);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
// 放入線程讓其執(zhí)行
// 客戶端一般都用一個線程實(shí)現(xiàn)即可 不用線程池
new IoStreamThreadWork(fis, output).start();
return;
}
}
在 Mina 的使用過程中,應(yīng)注意以下問題:
【1】客戶端IoSession.setAttribute后,服務(wù)器IoSession.getAttribute為null,這兩個session雖然getId一樣,但不是同一個session
【2】使用TextLineCodecFactory,messageReceived方法默認(rèn)只能接收以換行符結(jié)束的信息
【3】使用getAttribute時(shí)應(yīng)注意判斷空指針:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))
否則不會報(bào)錯,但下面的程序段會終止執(zhí)行??稍趀xceptionCaught方法中catch到異常
【4】每個客戶端對應(yīng)一個ConnectFuture,每個ConnectFuture對應(yīng)一個IoSession
【5】可以巧用session.setAttribute("loginFrame", LoginFrame.this);來進(jìn)行窗口的傳遞,注意:如果在監(jiān)聽中直接傳this,要注意是不是LoginFrame.this
【6】mina不能動態(tài)修改acceptor的ProtocolCodecFilter和Handler
【7】傳輸文件可通過StreamIoHandler和MessageEncoder的方式
整個案例的源碼及相關(guān) jar 包在百度云中給出鏈接 Mina,讀者如要運(yùn)行項(xiàng)目,還需自行安裝 MySQL 創(chuàng)建數(shù)據(jù)庫。
謝謝您的關(guān)注和支持!