win7基于MongoDB的仓储实现
简单的介绍一下,我使用MongoDB的场景。
我们现在的物联网环境下,有部分数据,采样频率为2000条记录/分钟,这样下来一天24*60*2000=2880000约等于300万条数据,以后必然还会增加。之前数据库使用的是mssql,对于数据库的压力很大,同时又需要保证历史查询的响应速度,这种情况下,在单表中数据量大,同时存在读写操作。不得已采用MongoDB来存储数据。如果使用MongoDB,则至少需要三台机器,两台实现读写分离,一台作为仲裁(当然条件不允许也可以不用),每台机器的内存暂时配置在16G,公司小,没办法,据说,使用这个MongoDB需要机器内存最少92G,我没有验证过,但是吃内存是公认的,所以内存绝对要保证,就算保证了,也不一定完全就没有意外发生。我们上面的这些特殊的数据是允许少量的丢失的,这些只是做分析使用的,几个月了,暂时还没出现数据丢失的情况,可能最新版本早就修复了吧,新手使用建议多看下官网上的说明。下面直接奔入主题:
一、安装部署和配置环境
1.安装部署mongo-server(V3.4)
参考 点击这里进入
这个时候不要启动,接着配置config文件
2.配置Config文件
dbpath=C:/Program Files/MongoDB/Server/3.4/bin/data/db logpath=C:/Program Files/MongoDB/Server/3.4/bin/data/log/master.log pidfilepath=C:/Program Files/MongoDB/Server/3.4/bin/master.pid directoryperdb=true logappend=true replSet=testrs bind_ip=10.1.5.25 port=27016 oplogSize=10000 noauth = true storageEngine = wiredTiger wiredTigerCacheSizeGB = 2 syncdelay = 30 wiredTigerCollectionBlockCompressor = snappy
以上是详细的配置参数,其中路径部分根据需要更改, 这里设置的oplogsize大小为10G,根据业务场景进行调整,另外auth权限为null,因为设置权限会增加服务开销,影响效率,最下面几行是内存引擎,可以控制副本集同步及内存限制,防止内存泄露。
3.启动mongo-server
4.添加副本集配置
conf= { "_id" : "testrs", "members" : [ { "_id" : 0, "host" : "10.1.5.25:27016" }, { "_id" : 1, "host" : "10.1.5.26:27016" }, { "_id" : 2, "host" : "10.1.5.27:27016" } ] } rs.initiate(conf)
此时副本集集群配置已经完成,然后在命令行中输入:rs.status(),查看副本集状态,需要查看同步情况,可以输入命令:db.serverStatus().
5.设置副本集可读写
Rs.slaveOk()
6..NET操作mongo
连接设置,请参考个人封装Unitoon.Mongo代码所示。
7.性能对比
读写速度:Redis>Mongo>Mssqlserver
可容纳数据量:Mssqlserver~Mongo>Redis
存储数据类型:Mongo>Mssqlserver>Redis
Note:内存持续上升,内部没有内存回收机制,若限制内存 ,则可能出现查询速度变慢,数据丢失等问题,建议优化查询效率,建立索引
Db.test.ensureIndex({"username":1, "age":-1})
强制释放内存命令:db.runCommand({closeAllDatabases:1})
二、仓储设计
1.基类BaseEntity
namespace
UnitoonIot.Mongo
{
/// <summary>
/// 实体基类,方便生成ObjId
/// </summary>
[Serializable]
[ProtoContract(ImplicitFields = ImplicitFields.AllPublic)]
//[ProtoInclude(10, typeof(NormalHistory))]
public
class
BaseEntity
{
//[BsonRepresentation(BsonType.ObjectId)]
public
ObjectId Id {
get
;
set
; }
/// <summary>
/// 数据库名称
/// </summary>
public
string
DbName {
get
;
set
; }
/// <summary>
/// 给对象初值
/// </summary>
public
BaseEntity()
{
// this.ObjId = ObjectId.GenerateNewId().ToString();
//this.Id = ObjectId.NewObjectId().ToString();
}
}
}
这里需要注意时间格式,MongoDB默认时间格式为国际时间,所以在写入数据时和读取数据时,时间格式要一致,此例中没有对时间进行特殊处理,由传入的时间格式确定。
2.Repository继承接口IMongoRepository
namespace UnitoonIot.Mongo { public interface IMongoRepositorywhere TEntity : class { } } 3.MongoRepository
using
MongoDB.Driver;
using
MongoDB.Bson;
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Linq.Expressions;
using
System.Text;
using
System.Threading.Tasks;
using
MongoDB.Bson.Serialization.Attributes;
using
MongoDB.Driver.Linq;
using
System.Configuration;
using
System.IO;
using
UnitoonIot.AppSetting;
namespace
UnitoonIot.Mongo
{
public
class
MongoDb
{
private
static
string
ConnectionStringHost ;
private
static
string
UserName ;
private
static
string
Password;
private
static
IMongoDatabase _db =
null
;
private
static
readonly
object
LockHelper =
new
object
();
/// <summary>
/// mongodb初始化
/// </summary>
public
static
void
Init()
{
ConnectionStringHost =
"10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017"
;
//AppSettings.GetConfigValue("MongoHost");//"10.1.5.24:27016";
UserName = AppSettings.GetConfigValue(
"MongoUserName"
);
Password = AppSettings.GetConfigValue(
"MongoPwd"
);
}
static
MongoDb()
{
}
public
static
IMongoDatabase GetDb(
string
dbName,
string
options=
null
)
{
if
(_db !=
null
)
return
_db;
lock
(LockHelper)
{
if
(_db !=
null
)
return
_db;
var database = dbName;
var userName = UserName;
var password = Password;
var authentication =
string
.Empty;
var host =
string
.Empty;
if
(!
string
.IsNullOrWhiteSpace(userName))
{
authentication =
string
.Concat(userName,
':'
, password,
'@'
);
}
if
(!
string
.IsNullOrEmpty(options) && !options.StartsWith(
"?"
))
{
options =
string
.Concat(
'?'
, options);
}
host =
string
.IsNullOrEmpty(ConnectionStringHost) ?
"localhost"
: ConnectionStringHost;
database = database ??
"testdb"
;
//mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostN[:portN]]][/[database][?options]]
var conString = options!=
null
? $
"mongodb://{authentication}{host}/{database}{options}"
: $
"mongodb://{authentication}{host}/{database}"
;
var url =
new
MongoUrl(conString);
var mcs = MongoClientSettings.FromUrl(url);
mcs.MaxConnectionLifeTime = TimeSpan.FromMilliseconds(1000);
var client =
new
MongoClient(mcs);
_db = client.GetDatabase(url.DatabaseName);
}
return
_db;
}
}
/// <summary>
/// MongoDb 数据库操作类
/// </summary>
public
class
MongoRepository<T>: IMongoRepository<T> where T : BaseEntity
{
#region readonly field
/// <summary>
/// 表名
/// </summary>
private
readonly
IMongoCollection<T> _collection =
null
;
/// <summary>
/// 数据库对象
/// </summary>
private
readonly
IMongoDatabase _database;
#endregion
/// <summary>
/// 构造函数
/// </summary>
public
MongoRepository()
{
this
._database = MongoDb.GetDb(Activator.CreateInstance<T>().DbName,
"readPreference =secondaryPreferred "
);
//primaryPreferred/secondaryPreferred/nearest
_collection = _database.GetCollection<T>(
typeof
(T).Name);
}
#region 增加
/// <summary>
/// 插入对象
/// </summary>
/// <param name="t">插入的对象</param>
public
virtual
T Insert(T t)
{
// var flag = ObjectId.GenerateNewId();
// t.GetType().GetProperty("Id").SetValue(t, flag);
//t.Time = DateTime.Now;
_collection.InsertOne(t);
return
t;
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="ts">要插入的对象集合</param>
public
virtual
IEnumerable<T> InsertBatch(IEnumerable<T> ts)
{
_collection.InsertMany(ts);
return
ts;
}
/// <summary>
/// 插入对象
/// </summary>
/// <param name="t">插入的对象</param>
public
virtual
void
InsertAsync(T t)
{
//var flag = ObjectId.GenerateNewId();
// t.GetType().GetProperty("Id").SetValue(t, flag);
// t.Time = DateTime.Now;
_collection.InsertOneAsync(t);
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="ts">要插入的对象集合</param>
public
virtual
void
InsertBatchAsync(IEnumerable<T> ts)
{
_collection.InsertManyAsync(ts);
}
#endregion
#region 删除
/// <summary>
/// 删除
/// </summary>
/// <returns></returns>
public
virtual
long
Delete(T t)
{
var filter = Builders<T>.Filter.Eq(
"Id"
, t.Id);
var result = _collection.DeleteOne(filter);
return
result.DeletedCount;
}
/// <summary>
/// 删除
/// </summary>
/// <returns></returns>
public
virtual
void
DeleteAsync(T t)
{
var filter = Builders<T>.Filter.Eq(
"Id"
, t.Id);
_collection.DeleteOneAsync(filter);
}
/// <summary>
/// 按条件表达式删除
/// </summary>
/// <param name="predicate">条件表达式</param>
/// <returns></returns>
public
virtual
long
Delete(Expression<Func<T,
bool
>> predicate)
{
var result = _collection.DeleteOne(predicate);
return
result.DeletedCount;
}
/// <summary>
/// 按条件表达式删除
/// </summary>
/// <param name="predicate">条件表达式</param>
/// <returns></returns>
public
virtual
void
DeleteAsync(Expression<Func<T,
bool
>> predicate)
{
_collection.DeleteOneAsync(predicate);
}
/// <summary>
/// 按条件表达式批量删除
/// </summary>
/// <param name="predicate">条件表达式</param>
/// <returns></returns>
public
virtual
long
DeleteBatch(Expression<Func<T,
bool
>> predicate)
{
var result = _collection.DeleteMany(predicate);
return
result.DeletedCount;
}
/// <summary>
/// 按条件表达式批量删除
/// </summary>
/// <param name="predicate">条件表达式</param>
/// <returns></returns>
public
virtual
void
DeleteBatchAsync(Expression<Func<T,
bool
>> predicate)
{
_collection.DeleteManyAsync(predicate);
}
/// <summary>
/// 按检索条件删除
/// 建议用Builders<T>构建复杂的查询条件
/// </summary>
/// <param name="filter">条件</param>
/// <returns></returns>
public
virtual
long
Delete(FilterDefinition<T> filter)
{
var result = _collection.DeleteOne(filter);
return
result.DeletedCount;
}
/// <summary>
/// 按检索条件删除
/// 建议用Builders<T>构建复杂的查询条件
/// </summary>
/// <param name="filter">条件</param>
/// <returns></returns>
public
virtual
void
DeleteAsync(FilterDefinition<T> filter)
{
_collection.DeleteOneAsync(filter);
}
#endregion
#region 修改
/// <summary>
/// 修改(Id不变)
/// </summary>
/// <returns></returns>
public
virtual
long
Update(T t)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
,t.Id);
var update = _collection.ReplaceOne(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
return
update.ModifiedCount;
}
/// <summary>
/// 修改(Id不变)
/// </summary>
/// <returns></returns>
public
virtual
void
UpdateAsync(T t)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
, t.Id);
_collection.ReplaceOneAsync(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 用新对象替换新文档
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="t">对象</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
Update(Expression<Func<T,
bool
>> filter, T t)
{
var update = _collection.ReplaceOne(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
return
update.ModifiedCount;
}
/// <summary>
/// 用新对象替换新文档
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="t">对象</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
Update(FilterDefinition<T> filter, T t)
{
var update = _collection.ReplaceOne(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
return
update.ModifiedCount;
}
/// <summary>
/// 用新对象替换新文档
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="t">对象</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateAsync(Expression<Func<T,
bool
>> filter, T t)
{
_collection.ReplaceOneAsync(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 用新对象替换新文档
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="t">对象</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateAsync(FilterDefinition<T> filter, T t)
{
_collection.ReplaceOneAsync(filter, t,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据Id和条件文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="id">对象Id</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
Update(
string
id, UpdateDefinition<T> update)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
,
new
ObjectId(id));
var result = _collection.UpdateOne(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
return
result.ModifiedCount;
}
/// <summary>
/// 根据Id和条件文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="id">对象Id</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateAsync(
string
id, UpdateDefinition<T> update)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
,
new
ObjectId(id));
_collection.UpdateOneAsync(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据条件修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
Update(UpdateDefinition<T> update,Expression<Func<T,
bool
>> filter)
{
_collection.UpdateOne(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据条件修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
Update(UpdateDefinition<T> update, FilterDefinition<T> filter)
{
var result = _collection.UpdateOne(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
return
result.ModifiedCount;
}
/// <summary>
/// 根据条件修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateAsync(UpdateDefinition<T> update, Expression<Func<T,
bool
>> filter)
{
_collection.UpdateOneAsync(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据条件修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateAsync(UpdateDefinition<T> update, FilterDefinition<T> filter)
{
_collection.UpdateOneAsync(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据条件批量修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
UpdateBatch(UpdateDefinition<T> update, Expression<Func<T,
bool
>> filter)
{
var result = _collection.UpdateMany(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
return
result.ModifiedCount;
}
/// <summary>
/// 根据条件批量修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
long
UpdateBatch(UpdateDefinition<T> update, FilterDefinition<T> filter)
{
var result = _collection.UpdateMany(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
return
result.ModifiedCount;
}
/// <summary>
/// 根据条件批量修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateBatchAsync(UpdateDefinition<T> update, Expression<Func<T,
bool
>> filter)
{
_collection.UpdateManyAsync(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
/// <summary>
/// 根据条件批量修改文档
/// </summary>
/// <param name="update">修改条件-形如:Builders/<T/>.Update.Set(filed, value)</param>
/// <param name="filter">查询条件Builders/<T/>.Filter.Eq(filed, value)</param>
/// <returns>修改影响文档数</returns>
public
virtual
void
UpdateBatchAsync(UpdateDefinition<T> update, FilterDefinition<T> filter)
{
_collection.UpdateManyAsync(filter, update,
new
UpdateOptions() { IsUpsert =
true
});
}
#endregion
#region 查询
#region GetCollection
/// <summary>
/// 获取操作对象的IMongoCollection集合,强类型对象集合
/// </summary>
/// <returns></returns>
public
virtual
IMongoCollection<T> GetCollection()
{
return
_database.GetCollection<T>(
typeof
(T).Name);
}
#endregion
#region GetSingle
/// <summary>
/// 查询数据库,检查是否存在指定ID的对象
/// </summary>
/// <param name="id">对象的ID值</param>
/// <returns>存在则返回指定的对象,否则返回Null</returns>
public
virtual
T GetById(
string
id)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
,
new
ObjectId(id));
var data = _collection.Find(filter).FirstOrDefault();
return
data;
}
/// <summary>
/// 查询数据库,检查是否存在指定ID的对象
/// </summary>
/// <param name="id">对象的ID值</param>
/// <returns>存在则返回指定的对象,否则返回Null</returns>
public
virtual
async Task<T> GetAsyncById(
string
id)
{
var filterBuilder = Builders<T>.Filter;
var filter = filterBuilder.Eq(
"Id"
,
new
ObjectId(id));
var data = await _collection.FindAsync(filter);
return
await data.SingleOrDefaultAsync();
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="filter">过滤条件</param>
/// <returns></returns>
public
virtual
T Get(FilterDefinition<T> filter)
{
return
_collection.Find(filter).FirstOrDefault();
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="filter">条件表达式</param>
/// <returns></returns>
public
virtual
T Get(Expression<Func<T,
bool
>> filter)
{
return
_collection.Find(filter).FirstOrDefault();
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="filter">过滤条件</param>
/// <returns></returns>
public
virtual
async Task<T> GetAsync(FilterDefinition<T> filter)
{
var data = await _collection.FindAsync(filter);
return
await data.SingleOrDefaultAsync();
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="filter">条件表达式</param>
/// <returns></returns>
public
virtual
async Task<T> GetAsync(Expression<Func<T,
bool
>> filter)
{
var data = await _collection.FindAsync(filter);
return
await data.SingleOrDefaultAsync();
}
#endregion
#region GetMany
/// <summary>
/// 查询部分数据
/// </summary>
/// <param name="filter">过滤条件</param>
/// <returns></returns>
public
virtual
IEnumerable<T> GetMany(FilterDefinition<T> filter)
{
return
_collection.Find(filter).ToEnumerable();
}
/// <summary>
/// 查询部分数据
/// </summary>
/// <param name="filter">条件表达式</param>
/// <returns></returns>
public
virtual
IEnumerable<T> GetMany(Expression<Func<T,
bool
>> filter)
{
//return _collection.AsQueryable().Where(filter).ToList();
//return _collection.AsQueryable().Where(filter);
return
_collection.Find(filter).ToEnumerable();
//.ToEnumerable();
}
/// <summary>
/// 查询部分数据
/// </summary>
/// <param name="filter">过滤条件</param>
/// <returns></returns>
public
virtual
async Task<IEnumerable<T>> GetManyAsync(FilterDefinition<T> filter)
{
var data = await _collection.FindAsync(filter);
return
await data.ToListAsync();
}
/// <summary>
/// 查询部分数据
/// </summary>
/// <param name="filter">过滤条件</param>
/// <returns></returns>
public
virtual
async Task<IEnumerable<T>> GetManyAsync(Expression<Func<T,
bool
>> filter)
{
var data = await _collection.FindAsync(filter);
return
await data.ToListAsync();
}
#endregion
#region GetAll
/// <summary>
/// 查询所有记录,复杂查询直接用Linq处理(避免全表扫描)
/// </summary>
/// <returns>要查询的对象</returns>
public
virtual
IEnumerable<T> GetAll()
{
var data = _collection.AsQueryable();
return
data.ToEnumerable();
}
/// <summary>
/// 查询所有记录,复杂查询直接用Linq处理(避免全表扫描)
/// </summary>
/// <returns>要查询的对象</returns>
public
virtual
async Task<IEnumerable<T>> GetAllAsync()
{
var data = _collection.AsQueryable();
return
await data.ToListAsync();
}
/// <summary>
/// 查询所有记录,复杂查询直接用Linq处理(避免全表扫描)
/// </summary>
/// <returns>要查询的对象</returns>
public
virtual
IQueryable<T> GetAllQueryable()
{
return
_collection.AsQueryable();
}
#endregion
#region MapReduce
/// <summary>
/// MapReduce
/// </summary>
/// <returns>返回一个List列表数据</returns>
public
IEnumerable<T> GetMap(BsonJavaScript map,BsonJavaScript reduce)
{
return
_collection.MapReduce<T>(map,reduce).ToList();
}
#endregion
#endregion
}
}
关于“win7基于MongoDB的仓储实现” 的详细内容,小编都一一给大家整理出来了,对此你还有不懂的,欢迎随时来网站留言。关注爱站技术频道网站一定不会让你失望。
上一篇:mongodb主从复制的内容
下一篇:Mongodb认证鉴权的详细介绍