问题:

  • 公司开了个新项目,算上我一共3个人。车间里机台通过流水线连通联动的玩意。一个管理控制系统连接各个机台和硬件。专机类型就有5种,个数差不多20个左右。

  • 软件规划的时候采用总分的结构,管理控制系统和专机子系统之间通过消息中间件通讯。本来也想TCP连接来着,但是开发时间不允许。而且每个系统都得写一遍这个玩意。

  • 消息中间件有很多个,比如 Kafka、RabbitMQ、RocketMQ等国内外的消息中间件。这些中间件无论宣称的多么轻量级都要啃一下,更要命的是就他娘三个人。而且后面还要这个鸡儿系统可复制。

  • 考虑到消息及时性、开发难易程度、维护简便性等因素后决定用Redis的pub/sub功能来实现.软件结构大概如类似结构。

可用性:

  • 作为消息通知属于安装了Redis就有的功能,因为Redis是用在系统中存储一些热数据,不用单独维护,在Windows中属于服务直接就开了。

  • 作为可以分布式集群使用的数据库,消息传递应该比较OK了。虽然使用的client-server,但是server-server已经很好了。料想client-server也不会差

  • 试验消息内容发送订阅的情况下,速度在30毫秒内,貌似可以。看其他博主说大于10K入队比较慢,但是可以不用入消息队列啊,用发布订阅。

  • .net 下一般使用ServiceStack.Redis,要命的是4.0以后收费,可以破解的但是不支持List<T>型的数据直接存取,想用只能变成JSON字符串存着。

  • 如果只是用订阅发布功能,不存储热数据或者不使用List<T>的数据可以使用4.0以上的版本。文末会贴上两个类型的下载包。想用其他的包也可以,我这里只说一种思路。

实现:

模块结构图展示如下

public static class MSServer
{
// 定义一个object对象
private static object objinstance = new object(); private static ServerState CurState = ServerState.Free; static PooledRedisClientManager prcm; private static string clientmake = string.Empty; /// <summary>
/// 连接的地址
/// </summary>
/// <param name="IP">地址127.0.0.1:6379</param>
/// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param>
/// <returns></returns>
public static int OpenServer(string IP ,string[] rechannels)
{
try
{
if (prcm == null)
{
lock (objinstance)
{
if (prcm == null)
{
prcm = CreateManager(IP, IP);
CurState = ServerState.Init;
return CreateLink(rechannels);
}
}
}
}
catch
{
prcm = null;
CurState = ServerState.Free;
return -;
}
return ;
} private static int CreateLink(string[] SourceID)
{
if (CurState == ServerState.Init && SourceID.Length > )
{
try
{
using (IRedisClient Redis = prcm.GetReadOnlyClient())
{
clientmake = SourceID[];
var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList();
info.ForEach(i =>
{
Redis.KillClient(i["addr"]);
});
Redis.SetClient(clientmake);
IRedisSubscription sc = Redis.CreateSubscription();
Task.Run(() =>
{
try
{
sc.SubscribeToChannels(SourceID);
}
catch { }
});
sc.OnMessage += new Action<string, string>(showpub);
}
CurState = ServerState.Work;
}
catch
{
string message = string.Empty;
prcm = null;
CurState = ServerState.Free;
return -;
}
return ;
}
else
{
return ;
}
} public static Action<string, string> ReceiveMessage;
static void showpub(string channel, string message)
{
if (ReceiveMessage != null)
{
ReceiveMessage(channel, message);
}
} private static PooledRedisClientManager CreateManager(string writeHost, string readHost)
{
var redisClientConfig = new RedisClientManagerConfig
{
MaxWritePoolSize = ,//“写”链接池链接数
MaxReadPoolSize = ,//“读”链接池链接数
DefaultDb = ,
AutoStart = true,
};
//读的客户端只能接受特定的命令,不能用于发送信息
var RedisClientManager = new PooledRedisClientManager(
new string[] { writeHost }//用于写
, new string[] { readHost }//用于读
, redisClientConfig);
CurState = ServerState.Init; return RedisClientManager;
}
/// <summary>
/// 发送信息
/// </summary>
/// <param name="channel">通讯对象 "channel:1-13"</param>
/// <param name="meesage">发送信息 "test send "</param>
/// <returns>0 发送失败 1 发送成功 -1 连接损毁 检查网络后重建</returns>
public static long PubMessage(string channel, string meesage)
{
if (CurState == ServerState.Work)
{
if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage))
{
try
{
using (IRedisClient Redis = prcm.GetClient())
{
Redis.SetClient(clientmake);
return Redis.PublishMessage(channel, meesage);
}
}
catch
{
prcm = null;
CurState = ServerState.Free;
return -;
}
}
else
{
return ;
}
}
else
{
return -;
}
}
} public enum ServerState
{
Free,
Init,
Work,
Del
}

有一个问题,就是连接远程的服务器时如果网络断开再重连,会残留没用的client ,这样如果网络断断续续的话,会留好多没有清除的客户端。

这个在3.0.504版本中Redis 中也有这个问题,不知道是基于什么考虑的。所以需要建立连接的时候,给个客户端名称,再初始化的时候删掉所有同类型的名称。

使用的时候大概类似操作 textbox2.text = "channel:1-5" .为了简便发布的和监听的都是本地的一个通道。

private void button1_Click(object sender, EventArgs e)
{ //11.1.7.152 192.168.12.173
int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text });
label1.Text = result.ToString();
//1匿名事件
ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck); if (result == )
{
//发送失败重新发送 检查 通道和字符串后重新发送
}
else if (result == )
{
//发送成功
}
else if (result == -)
{
//连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
} } void fuck(string channel, string message)
{
this.BeginInvoke(new Action(() =>
{
textBox4.Text = channel + message;
}));
}
public bool sdfsd = true; private void button3_Click(object sender, EventArgs e)
{long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff")); if (result == )
{
//发送失败重新发送
}
else if (result == )
{
//发送成功
}
else if (result == -)
{
//连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
}
}

为了简便channel:是通道的固定命令 ,可以自定义channel:后面的内容,发送就有反馈。确保所有机台都接收到。

如果有断线的需要程序自己重连,接收通道的客户端不可以再给其他的使用,Redis上说Redis client 进入订阅模式时只能接受订阅发布等命令指令,不接受普通的存取和其他命令

所以如果需要在读取、写入、发布、执行其他的指令需要使用其他客户端,否则就出错了。跑了几天了上亿次的测试貌似没有出现什么问题。

发布订阅消息不会走AOF RDB只存在于内存中,即发即用,用完就没了。没在线就没了。需要考虑使用环境。

还用ping pong来确定连接状态,也可以自定义数据,使用场景要自己开发,要适合自己的才是好的。

下载:

4.0 dll

链接:https://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取码:js8p

5.8 dll不可以使用List<T>类型

链接:https://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
提取码:bxh2