脚本宝典收集整理的这篇文章主要介绍了C++框架,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
thrift
在网络一节中简单介绍了thrift的协议部分,在工程中会用得到thrift的线程并发,PRocess,server库。定义idl后生成代码和业务编写代码的关系如下:
运行过程:
1.开启threaft线程池
a.主线程创建n个,开启,数量不够workerMonitor_.wait。到100个就死了(加锁,结束释放)
b.工作线程开启后,加锁,增加数量,workerMonitor_.notify
任务空monitor_.wait()
否则取任务,判断等待队列长度不到阈值则manager_->maxMonitor_.notify(),
释放锁。执行任务。结束后继续抢锁循环
2.开启nonblockingserver,io线程就绪
a.非0号其他线程先start,设置eventbase(iothread),createpipe,注册notify;
event_base_loop(eventBase_, 0);【无监听,每个io线程自己的event_base】
b.0号线程注册事件,设置eventbase(iothread);注册监听;createpipe,注册notify。
0号io线程run,开始监听。其他io线程join
3.0号监听到handleEvent
accept
加锁create connection(init状态)分配连接给io线程(轮询)释放锁,通知分配的线程notifyhandler
4.分配到连接的IO线程notifyhandler(read notifyfd,transition)
本次transition: init会加读事件setread,回调为workSocket,读取后继续给transition
继续循环到读取结束,调用addtask=>setidle,不需要监听cfd
5.addtask
thrift,加锁,如果tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);
加入task队列,有空闲线程monitor_.notify()。任何一种monitor都公用一个锁。
这里的task就是process然后notifyIOThread(read notifyfd,transition)。
6.处理后通知IO线程
transition将cfd改为监听写事件,workSocket调用connenction的回调发送。
7.connenction的回调发送之后继续notifyIOThread 本次transition重置缓存区结束。
总结:多reactor多线程模式,一个accept,多个读写,单独任务处理。正常只需要一个reactor。单reactor多线程形式。
http_server
关于优雅重启
nginx这种多进程的比教好做,因为子进程可以独立于父进程。
主进程fork,继承监听fd,锁等,exec()执行完整代码。此时旧的子进程和新的子进程都可以抢锁监听fd处理连接,关闭旧主进程,给旧的子进程发送关闭信号,子进程在处理后才会监听到信号,做到了优雅。
线程没办法独立监听信号。
连接池
这里0是可用的。但是不要真的ping,否则代价太大,可以用read如果连接还在会发送EAGAIN错误则是连接中
add的就是任意连接对象。实现connect,reconnect.
比如
for (int i = 0; i < connectionCount; ++i) {
RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms);
redis->init();//CONNECT
redisPool_.add(redis);
}
改造的redis_pool
连接池+线程池+hiredis分别负责连接管理和并发请求处理。
封装目的:一般并发到分片获取数据的代理都有以下缺点:一个失败全部失败,要等所有返回才返回,而mget的失败会被放大。因此自己在业务层控制整个mget的超时时间和返回,到代理层已经拆分为当个get,用线程池实现。
spdLOG
-
业务调用
spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));
std::string info_file = FLAGS_log_path + "/" + FLAGS_info_file
auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());
debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");
inline std::shared_ptr<spdlog::logger> spdlog::create(const std::string& logger_name, Args... args)
{
sink_ptr sink = std::make_shared<Sink>(args...);
return details::registry::instance().create(logger_name, { sink });
/*锁控制 new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb); //这里启线程
_loggers[logger_name] = new_logger;*/
}
auto logger = spdlog::get("warn_logger");
if (logger != NULL) {
logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str());
}
info()=>log()->push_msg()
-
spdlog的push_msg就是enqueue
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
{
if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
{
auto last_op_time = details::os::now();
auto now = last_op_time;
do
{
now = details::os::now();
sleep_or_yield(now, last_op_time);
}
while (!_q.enqueue(std::move(new_msg)));
}
}
-
spdlog每个日志都一个线程,启动后会循环等dequeue到落盘
_worker_thread(&async_log_helper::worker_loop, this)
while (active)
{
try
{
active = process_next_msg(last_pop, last_flush);
}
}
inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
{
async_msg incoming_async_msg;
if (_q.dequeue(incoming_async_msg))
{
for (auto &s : _sinks)
{
if (s->should_log(incoming_log_msg.level))
{
s->log(incoming_log_msg); //调用正常的文件读写。
}
}
}
else
{
auto now = details::os::now();
handle_flush_interval(now, last_flush);
sleep_or_yield(now, last_pop);
return !_terminate_requested;
}
}
-
无锁队列
bool enqueue(T&& data)
{
cell_t* cell;
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
if (dif == 0)
{
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
}
else if (dif < 0)
{
return false;
}
else
{
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
}
cell->data_ = std::move(data);
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T& data)
{
cell_t* cell;
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq =
cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0)
{
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
}
else if (dif < 0)
return false;
else
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
data = std::move(cell->data_);
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
buffer 数组。每个有seq,data
enqueue seq前移pos+1
dequeue seq前移pos+1+mask 循环复用
memory_order_relaxed:不对执行顺序做保证
memory_order_acquire:本线程中,所有后续的读操作必须在本条原子操作完成后执行 memory_order_release:本线程中,所有之前的写操作完成后才能执行本条原子操作 a.COMpare_exchange_weak(n,w):比较a和n,如果相等,a赋值为w。不相等,n赋值为a,返回false
buffer {sequence,data} enqueue_pos 两个和cell中的值一直加1 dequeue_pos 同上
为何一个acquire一个relaxed呢? pos的CAS可以保证写的原子性。最低relaxed。能保证单独操作原子,保证不了顺序, 这种对顺序的限制性能一定比锁好吗? 这个只对单指令做限制,性能比锁好
以上是脚本宝典为你收集整理的C++框架全部内容,希望文章能够帮你解决C++框架所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。