Elasticsearch通过 River 可以与多种数据源 Wikipedia, MongoDB, CouchDB, RabbitMQ, RSS, Sofa, JDBC, FileSystem,Dropbox 等同步,公司的业务是用 MongoDB,今天测试环境虚拟机上配置了一下 Elasticsearch 与 MongoDB 的同步,作个大概的过程记录,主要利用richardwilly98 / elasticsearch-river-mongodb

River 通过读取 mongodb 的 oplog 来同步数据,oplog 这个表来使集群中的不同机器数据同步的,可以保证 es 里面的数据和 mongodb 里面的是一样的,所以 Mongdb 必须是个集群才能拥有 oplog.注意:该插件只支持集群环境下的 mongodb,因为集群环境下的 mongodb 才有 oplog 这个

Elasticsearch 和 MongoDB 需要安装对应的版本才能实现同步,我这里用了最新的 Elasticsearch 1.4.2 和 MongoDB 3.0.0,相应的版本要求参考下表

elasticsearch-mongod

MongDB 是一个副本集的集群,具体副本集集群的搭建不详细写了,Elasticsearch 的安装配置也省略。

1.安装 elasticsearch-river-mongodb

# ./elasticsearch-1.4.4/bin/plugin -install elasticsearch/elasticsearch-mapper-attachments/2.4.1

# ./elasticsearch-1.4.4/bin/plugin -i com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.5

2.建立 river

curl -XPUT "http://10.253.1.70:9200/_river/threads_mongo_river/_meta"</span> -d'

{
  "type": "mongodb",
  "mongodb": {
    "servers":
    [
      { "host": "10.253.1.71", "port": 27017}
    ],
    "db": "threads",
    "collection": "threads",
    "gridfs": false
  },
  "index": {
    "name": "test",
    "type": "threads"
  }
}'

这里只是简单的配置了建立连接的 MongoDB 以及相应的 db,collection 对于的 elasticsearch index 和 type,还有详细的配置没有使用到,比如 options 等,具体根据业务可以配置,下面是一份详细的配置样例:

$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d '
{
  "type": "mongodb",
  "mongodb": {
    "servers":
    [
      { "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
      { "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
    ],
    "options": {
      "secondary_read_preference" : true,
      "drop_collection": ${mongo.drop.collection},
      "exclude_fields": ${mongo.exclude.fields},
      "include_fields": ${mongo.include.fields},
      "include_collection": ${mongo.include.collection},
      "import_all_collections": ${mongo.import.all.collections},
      "initial_timestamp": {
        "script_type": ${mongo.initial.timestamp.script.type},
        "script": ${mongo.initial.timestamp.script}
      },
      "skip_initial_import" : ${mongo.skip.initial.import},
      "store_statistics" : ${mongo.store.statistics},
    },
    "credentials":
    [
      { "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
      { "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
    ],
    "db": ${mongo.db.name},
    "collection": ${mongo.collection.name},
    "gridfs": ${mongo.is.gridfs.collection},
    "filter": ${mongo.filter}
  },
  "index": {
    "name": ${es.index.name},
    "throttle_size": ${es.throttle.size},
    "bulk_size": ${es.bulk.size},
    "type": ${es.type.name}
    "bulk": {
      "actions": ${es.bulk.actions},
      "size": ${es.bulk.size},
      "concurrent_requests": ${es.bulk.concurrent.requests},
      "flush_interval": ${es.bulk.flush.interval}
    }
  }
}'

一些配置项的解释如下,具体可以查看 github 的wiki:

  • db 为同步的数据库名,
  • host mongodb 的 ip 地址(默认为 localhost)
  • port mongodb 的端口
  • collection 要同步的表名
  • fields 要同步的字段名(用逗号隔开,默认全部)
  • gridfs 是否是 gridfs 文件(如果 collection 是 gridfs 的话就设置成 true)
  • local_db_user local 数据库的用户名(没有的话不用写)
  • local_db_password local 数据库的密码(没有的话不用写)
  • db_user 要同步的数据库的密码(没有的话不用写)
  • db_password 要同步的数据库的密码(没有的话不用写)
  • name 索引名(不能之前存在)
  • type 类型
  • bulk_size 批量添加的最大数
  • bulk_timeout 批量添加的超时时间

3.测试是否成功

3.测试是否成功 我测试的库中数据较少,所以就直接都查出来看看是否能查出来吧

$ curl -XGET "http://10.253.1.70:9200/test/threads/_<a name="baidusnap1"></a><b>search</b>" -d '

{
    "took": 20,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
    },
    "hits": {
        "total": 4,
        "max_score": 1,
        "hits": [
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa32b22c44cf67cb6a9d1b",
                "_score": 1,
                "_source": {
                    "_id": "54fa32b22c44cf67cb6a9d1b",
                    "title": "where is my car",
                    "content": "ask yourself"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa2f5c2c44cf67cb6a9d19",
                "_score": 1,
                "_source": {
                    "_id": "54fa2f5c2c44cf67cb6a9d19",
                    "title": "this is title",
                    "content": "what is the fuck"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa2f892c44cf67cb6a9d1a",
                "_score": 1,
                "_source": {
                    "_id": "54fa2f892c44cf67cb6a9d1a",
                    "title": "are you ok",
                    "content": "yes,i am ok"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa49ccc104e2264e02deea",
                "_score": 1,
                "_source": {
                    "_id": "54fa49ccc104e2264e02deea",
                    "title": "hello word",
                    "content": "hello hello haha"
                }
            }
        ]
    }
}

看来数据已经同步过来了,然后在 MongoDB 添加一条记录,执行同样的操作查找对于的记录或者 total 已经+1 那么同步完成了。

转载请注明: 转载自Ryan 是菜鸟 | LNMP 技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文链接地址: Elasticsearch 与 MongoDB 数据同步及分布式集群搭建 (一)

知识共享许可协议 本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可