數(shù)據(jù)庫中間件 MyCAT 源碼分析 —— SQL ON MongoDB

1. 概述

2. 主流程

3. 查詢操作

4. 插入操作

5. 彩蛋

1. 概述

可能你在看到這個標(biāo)題會小小的吃驚,MyCAT 能使用 MongoDB 做數(shù)據(jù)節(jié)點。是的,沒錯,確實可以。

吼吼吼,讓我們開啟這段神奇的“旅途”。

本文主要分成四部分:

總體流程,讓你有個整體的認(rèn)識

查詢操作

插入操作

彩蛋,??彩蛋,??彩蛋

建議你看過這兩篇文章(非必須):

《MyCAT 源碼分析 —— 【單庫單表】插入》

《MyCAT 源碼分析 —— 【單庫單表】查詢》

2. 主流程

MyCAT Server接收MySQL Client基于MySQL協(xié)議的請求,翻譯SQLMongoDB操作發(fā)送給MongoDB Server。

MyCAT Server接收MongoDB Server返回的MongoDB數(shù)據(jù),翻譯成MySQL數(shù)據(jù)結(jié)果返回給MySQL Client。

這樣一看,MyCAT 連接 MongoDB 是不是少神奇一點列。

Java數(shù)據(jù)庫連接,(Java Database Connectivity,簡稱JDBC)是Java語言中用來規(guī)范客戶端程序如何來訪問數(shù)據(jù)庫的應(yīng)用程序接口,提供了諸如查詢和更新數(shù)據(jù)庫中數(shù)據(jù)的方法。JDBC也是Sun Microsystems的商標(biāo)。JDBC是面向關(guān)系型數(shù)據(jù)庫的。

MyCAT 使用 JDBC 規(guī)范,抽象了對 MongoDB 的訪問。通過這樣的方式,MyCAT 也抽象了 SequoiaDB 的訪問??赡苓@樣說法有些抽象,看個類圖壓壓驚。

是不是熟悉的味道。不得不說 JDBC 規(guī)范的精妙。

3. 查詢操作

SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;

看順序圖已經(jīng)很方便的理解整體邏輯,我就不多廢話啦。我們來看幾個核心的代碼邏輯。

1、查詢 MongoDB

// MongoSQLParser.javapublicMongoDataquery()throwsMongoSQLException{if(!(statementinstanceofSQLSelectStatement)) {//return null;thrownewIllegalArgumentException("not a query sql statement");? }? MongoData mongo =newMongoData();? DBCursor c =null;? SQLSelectStatement selectStmt = (SQLSelectStatement) statement;? SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();inticount =0;if(sqlSelectQueryinstanceofMySqlSelectQueryBlock) {? ? ? MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();? ? ? BasicDBObject fields =newBasicDBObject();// 顯示(返回)的字段for(SQLSelectItem item : mysqlSelectQuery.getSelectList()) {//System.out.println(item.toString());if(!(item.getExpr()instanceofSQLAllColumnExpr)) {if(item.getExpr()instanceofSQLAggregateExpr) {? ? ? ? ? ? ? ? ? SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();if(expr.getMethodName().equals("COUNT")) {// TODO 待讀:count(*)icount =1;? ? ? ? ? ? ? ? ? ? ? mongo.setField(getExprFieldName(expr), Types.BIGINT);? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? fields.put(getExprFieldName(expr),1);? ? ? ? ? ? ? }else{? ? ? ? ? ? ? ? ? fields.put(getFieldName(item),1);? ? ? ? ? ? ? }? ? ? ? ? }? ? ? }// 表名SQLTableSource table = mysqlSelectQuery.getFrom();? ? ? DBCollection coll =this._db.getCollection(table.toString());? ? ? mongo.setTable(table.toString());// WHERESQLExpr expr = mysqlSelectQuery.getWhere();? ? ? DBObject query = parserWhere(expr);// GROUP BYSQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();? ? ? BasicDBObject gbkey =newBasicDBObject();if(groupby !=null) {for(SQLExpr gbexpr : groupby.getItems()) {if(gbexprinstanceofSQLIdentifierExpr) {? ? ? ? ? ? ? ? ? String name = ((SQLIdentifierExpr) gbexpr).getName();? ? ? ? ? ? ? ? ? gbkey.put(name, Integer.valueOf(1));? ? ? ? ? ? ? }? ? ? ? ? }? ? ? ? ? icount =2;? ? ? }// SKIP / LIMITintlimitoff =0;intlimitnum =0;if(mysqlSelectQuery.getLimit() !=null) {? ? ? ? ? limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());? ? ? ? ? limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());? ? ? }if(icount ==1) {// COUNT(*)mongo.setCount(coll.count(query));? ? ? }elseif(icount ==2) {// MapReduceBasicDBObject initial =newBasicDBObject();? ? ? ? ? initial.put("num",0);? ? ? ? ? String reduce ="function (obj, prev) { "+"? prev.num++}";? ? ? ? ? mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));? ? ? }else{if((limitoff >0) || (limitnum >0)) {? ? ? ? ? ? ? c = coll.find(query, fields).skip(limitoff).limit(limitnum);? ? ? ? ? }else{? ? ? ? ? ? ? c = coll.find(query, fields);? ? ? ? ? }// order bySQLOrderBy orderby = mysqlSelectQuery.getOrderBy();if(orderby !=null) {? ? ? ? ? ? ? BasicDBObject order =newBasicDBObject();for(inti =0; i < orderby.getItems().size(); i++) {? ? ? ? ? ? ? ? ? SQLSelectOrderByItem orderitem = orderby.getItems().get(i);? ? ? ? ? ? ? ? ? order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));? ? ? ? ? ? ? }? ? ? ? ? ? ? c.sort(order);// System.out.println(order);}? ? ? }? ? ? mongo.setCursor(c);? }returnmongo;}

2、查詢條件

// MongoSQLParser.javaprivatevoidparserWhere(SQLExpr aexpr, BasicDBObject o){if(aexprinstanceofSQLBinaryOpExpr) {? ? ? SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;? ? ? SQLExpr exprL = expr.getLeft();if(!(exprLinstanceofSQLBinaryOpExpr)) {if(expr.getOperator().getName().equals("=")) {? ? ? ? ? ? ? o.put(exprL.toString(), getExpValue(expr.getRight()));? ? ? ? ? }else{? ? ? ? ? ? ? String op ="";if(expr.getOperator().getName().equals("<")) {? ? ? ? ? ? ? ? ? op ="$lt";? ? ? ? ? ? ? }elseif(expr.getOperator().getName().equals("<=")) {? ? ? ? ? ? ? ? ? op ="$lte";? ? ? ? ? ? ? }elseif(expr.getOperator().getName().equals(">")) {? ? ? ? ? ? ? ? ? op ="$gt";? ? ? ? ? ? ? }elseif(expr.getOperator().getName().equals(">=")) {? ? ? ? ? ? ? ? ? op ="$gte";? ? ? ? ? ? ? }elseif(expr.getOperator().getName().equals("!=")) {? ? ? ? ? ? ? ? ? op ="$ne";? ? ? ? ? ? ? }elseif(expr.getOperator().getName().equals("<>")) {? ? ? ? ? ? ? ? ? op ="$ne";? ? ? ? ? ? ? }? ? ? ? ? ? ? parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));? ? ? ? ? }? ? ? }else{if(expr.getOperator().getName().equals("AND")) {? ? ? ? ? ? ? parserWhere(exprL, o);? ? ? ? ? ? ? parserWhere(expr.getRight(), o);? ? ? ? ? }elseif(expr.getOperator().getName().equals("OR")) {? ? ? ? ? ? ? orWhere(exprL, expr.getRight(), o);? ? ? ? ? }else{thrownewRuntimeException("Can't identify the operation of? of where");? ? ? ? ? }? ? ? }? }}privatevoidorWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob){? BasicDBObject xo =newBasicDBObject();? BasicDBObject yo =newBasicDBObject();? parserWhere(exprL, xo);? parserWhere(exprR, yo);? ob.put("$or",newObject[]{xo, yo});}

3、解析 MongoDB 數(shù)據(jù)

// MongoResultSet.javapublicMongoResultSet(MongoData mongo, String schema)throwsSQLException{this._cursor = mongo.getCursor();this._schema = schema;this._table = mongo.getTable();this.isSum = mongo.getCount() >0;this._sum = mongo.getCount();this.isGroupBy = mongo.getType();if(this.isGroupBy) {? ? ? dblist = mongo.getGrouyBys();this.isSum =true;? }if(this._cursor !=null) {? ? ? select = _cursor.getKeysWanted().keySet().toArray(newString[0]);// 解析 fieldsif(this._cursor.hasNext()) {? ? ? ? ? _cur = _cursor.next();if(_cur !=null) {if(select.length ==0) {? ? ? ? ? ? ? ? ? SetFields(_cur.keySet());? ? ? ? ? ? ? }? ? ? ? ? ? ? _row =1;? ? ? ? ? }? ? ? }// 設(shè)置 fields 類型if(select.length ==0) {? ? ? ? ? select =newString[]{"_id"};? ? ? ? ? SetFieldType(true);? ? ? }else{? ? ? ? ? SetFieldType(false);? ? ? }? }else{? ? ? SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};SetFieldType(mongo.getFields());? }}

當(dāng)使用SELECT *查詢字段時,fields 使用第一條數(shù)據(jù)返回的 fields。即使,后面的數(shù)據(jù)有其他 fields,也不返回。

4、返回數(shù)據(jù)給 MySQL Client

// JDBCConnection.javaprivatevoidouputResultSet(ServerConnection sc, String sql)throwsSQLException{? ResultSet rs =null;? Statement stmt =null;try{? ? ? stmt = con.createStatement();? ? ? rs = stmt.executeQuery(sql);// headerList fieldPks =newLinkedList<>();? ? ? ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);intcolunmCount = fieldPks.size();? ? ? ByteBuffer byteBuf = sc.allocate();? ? ? ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket();? ? ? headerPkg.fieldCount = fieldPks.size();? ? ? headerPkg.packetId = ++packetId;? ? ? byteBuf = headerPkg.write(byteBuf, sc,true);? ? ? byteBuf.flip();byte[] header =newbyte[byteBuf.limit()];? ? ? byteBuf.get(header);? ? ? byteBuf.clear();? ? ? List fields =newArrayList(fieldPks.size());for(FieldPacket curField : fieldPks) {? ? ? ? ? curField.packetId = ++packetId;? ? ? ? ? byteBuf = curField.write(byteBuf, sc,false);? ? ? ? ? byteBuf.flip();byte[] field =newbyte[byteBuf.limit()];? ? ? ? ? byteBuf.get(field);? ? ? ? ? byteBuf.clear();? ? ? ? ? fields.add(field);? ? ? }// header eofEOFPacket eofPckg =newEOFPacket();? ? ? eofPckg.packetId = ++packetId;? ? ? byteBuf = eofPckg.write(byteBuf, sc,false);? ? ? byteBuf.flip();byte[] eof =newbyte[byteBuf.limit()];? ? ? byteBuf.get(eof);? ? ? byteBuf.clear();this.respHandler.fieldEofResponse(header, fields, eof,this);// rowwhile(rs.next()) {? ? ? ? ? RowDataPacket curRow =newRowDataPacket(colunmCount);for(inti =0; i < colunmCount; i++) {intj = i +1;if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {? ? ? ? ? ? ? ? ? curRow.add(rs.getBytes(j));? ? ? ? ? ? ? }elseif(fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||? ? ? ? ? ? ? ? ? ? ? fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)) {// field type is unsigned byte// ensure that do not use scientific notation formatBigDecimal val = rs.getBigDecimal(j);? ? ? ? ? ? ? ? ? curRow.add(StringUtil.encode(val !=null? val.toPlainString() :null, sc.getCharset()));? ? ? ? ? ? ? }else{? ? ? ? ? ? ? ? ? curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));? ? ? ? ? ? ? }? ? ? ? ? }? ? ? ? ? curRow.packetId = ++packetId;? ? ? ? ? byteBuf = curRow.write(byteBuf, sc,false);? ? ? ? ? byteBuf.flip();byte[] row =newbyte[byteBuf.limit()];? ? ? ? ? byteBuf.get(row);? ? ? ? ? byteBuf.clear();this.respHandler.rowResponse(row,this);? ? ? }? ? ? fieldPks.clear();// row eofeofPckg =newEOFPacket();? ? ? eofPckg.packetId = ++packetId;? ? ? byteBuf = eofPckg.write(byteBuf, sc,false);? ? ? byteBuf.flip();? ? ? eof =newbyte[byteBuf.limit()];? ? ? byteBuf.get(eof);? ? ? sc.recycle(byteBuf);this.respHandler.rowEofResponse(eof,this);? }finally{if(rs !=null) {try{? ? ? ? ? ? ? rs.close();? ? ? ? ? }catch(SQLException e) {? ? ? ? ? }? ? ? }if(stmt !=null) {try{? ? ? ? ? ? ? stmt.close();? ? ? ? ? }catch(SQLException e) {? ? ? ? ? }? ? ? }? }}// MongoResultSet.java@OverridepublicStringgetString(String columnLabel)throwsSQLException{? Object x = getObject(columnLabel);if(x ==null) {returnnull;? }returnx.toString();}

當(dāng)返回字段值是 Object 時,返回該對象.toString()。例如:

mysql>select*fromuserorderby_idasc;+--------------------------+------+-------------------------------+| _id? ? ? ? ? ? ? ? ? ? ? | name | profile? ? ? ? ? ? ? ? ? ? ? |+--------------------------+------+-------------------------------+|1|123| {"age":1,"height":100} |

4. 插入操作

// MongoSQLParser.javapublicintexecuteUpdate()throwsMongoSQLException{if(statementinstanceofSQLInsertStatement) {returnInsertData((SQLInsertStatement) statement);? }if(statementinstanceofSQLUpdateStatement) {returnUpData((SQLUpdateStatement) statement);? }if(statementinstanceofSQLDropTableStatement) {returndropTable((SQLDropTableStatement) statement);? }if(statementinstanceofSQLDeleteStatement) {returnDeleteDate((SQLDeleteStatement) statement);? }if(statementinstanceofSQLCreateTableStatement) {return1;? }return1;}privateintInsertData(SQLInsertStatement state){if(state.getValues().getValues().size() ==0) {thrownewRuntimeException("number of? columns error");? }if(state.getValues().getValues().size() != state.getColumns().size()) {thrownewRuntimeException("number of values and columns have to match");? }? SQLTableSource table = state.getTableSource();? BasicDBObject o =newBasicDBObject();inti =0;for(SQLExpr col : state.getColumns()) {? ? ? o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));? ? ? i++;? }? DBCollection coll =this._db.getCollection(table.toString());? coll.insert(o);return1;}

5. 彩蛋

1、支持多 MongoDB ,并使用 MyCAT 進(jìn)行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作為同一個 MyCAT Table 的數(shù)據(jù)節(jié)點。查詢時,可以合并數(shù)據(jù)結(jié)果。

查詢時,返回 MySQL 數(shù)據(jù)記錄字段要比 MongoDB 數(shù)據(jù)記錄字段全,否則,合并結(jié)果時會報錯。

MyCAT 配置:single_mongodb_mysql

3、MongoDB 作為數(shù)據(jù)節(jié)點時,可以使用 MyCAT 提供的數(shù)據(jù)庫主鍵字段功能。

MyCAT 配置:single_mongodb

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容