高性能代码优化实战与解析
SUMMARY
在实际软件开发中,性能优化的手段非常多样,最常见的往往是业务逻辑、数据结构和算法层面的改进。本文主要聚焦于计算密集型场景下的一些底层优化方法,包括循环展开、SIMD、循环合并、分支去除和数据预取等。这些技术并不是通用的“万能钥匙”,而是针对特定类型的性能瓶颈,在合适的场景下可以带来显著提升。
它们主要适用于对算力、内存访问和指令执行效率要求较高的场景,比如网络包处理、批量数据计算、底层库开发等。对于大多数业务系统,算法和数据结构的选择往往对性能影响更大。
现代 CPU 是高度并行和深度乱序(Out-of-Order)的计算机器,一些性能损耗常常来源于:
- 指令执行受限于控制相关(分支预测失败、循环控制过多)
- 数据访问带来的长延迟(L1≈4 cycles,L2≈12,L3≈40,内存≈200+ cycles)
- 执行单元吞吐未被充分利用(指令级并行度不足)
- Pipeline flush(错误预测 / 异常)
本文将就以下目标进行实践分析:
优化方向 | 目标 | 核心效果 |
---|---|---|
循环展开 | 减少分支/循环控制开销,提高指令级并行度 | 减少 branch 次数,提高吞吐 |
SIMD | 批量数据并行 | 单次处理多元素,显著提升吞吐 |
循环合并 | 减少遍历次数 / 提高缓存局部性 | 降低总 loop overhead,提升 Cache 命中 |
分支去除 | 降低预测失败成本 | 避免 pipeline flush |
数据预取 | 隐藏内存访问延迟 | 将长延迟与有效计算重叠 |
在实际工程中,性能提升往往来自“消除瓶颈 + 提升资源利用率”而非“写更花哨的代码”。
案例一:循环展开
1 朴素版本:按字节查找字符串终止符
size_t strlen_naive(const char *s) {size_t i = 0;while (1) {if (s[i] != '\0') {i++;} else {break;}}return i;
}
问题:
- 每次迭代只检查 1 字节
- 每次都要执行一次循环控制与分支判断
- 分支预测稳定但循环开销占比大(短字符串时更明显)
2 手工循环展开 x4
size_t strlen_unrolled(const char *s) {const char *p = s;while (1) {if (!p[0]) return p - s;if (!p[1]) return p - s + 1;if (!p[2]) return p - s + 2;if (!p[3]) return p - s + 3;p += 4;}
}
展开后:
指标 | naive | unrolled x4 |
---|---|---|
循环迭代次数 | N | N / 4 |
数据检查次数 | N | N |
循环控制判断 | N | N / 4 |
收益来源:
- 更少的循环控制分支
- 提供更多可并行的指令窗口(OOO 更容易重排)
- 但代码尺寸增加,极端展开会引入 i-cache 压力
3 SIMD + 展开(AVX2)
size_t strlen_simd_avx2(const char *s) {const char *p = s;__m256i zero = _mm256_setzero_si256(); // 全0向量while (1) {__m256i chunk = _mm256_loadu_si256((const __m256i*)p); //一次取256位32字节的数据__m256i cmp = _mm256_cmpeq_epi8(chunk, zero); // 逐个字节比较是否为0,如果有0,cmp的对应字节为0XFFunsigned mask = _mm256_movemask_epi8(cmp); // 把32字节的cmp每个字节的最高位拿出来拼接位32位maskif (mask != 0) {// 找到第一个 '\0' 的位置int offset = __builtin_ctz(mask); // count trailing zerosreturn (p - s) + offset;}p += 32;}
}
原理:
- 利用专用指令一次载入 32 字节
- 逐字节并行比较是否为 0
_mm256_movemask_epi8
将每字节最高位打包成 32bit mask__builtin_ctz
快速定位第一个匹配位置
进一步说明:
- GCC/Clang 内置
strlen
已经使用高度优化(包含 SIMD+位技巧),自实现仅用于学习。 - 对齐(aligned load)可再优化,但需要边界处理。
4 性能对比
随机生成10万个字符串,分别使用普通循环,循环展开和SIMD进行压力测试,结果如下:
Naive => time: 41.1273 ms, CheckSum=99414070
Unrolled => time: 30.1666 ms, CheckSum=99414070
SIMD+Unrolled(AVX2) => time: 16.7002 ms, CheckSum=99414070
什么时候不要盲目展开
- 代码尺寸变大导致 i-cache miss (L1 i-cache指令缓存通常只有几十KB)
- 循环体复杂(函数调用、异常处理、不可预测分支)
5 测试代码
#include <iostream>
#include <chrono>
#include <vector>
#include <string>
#include <cstdlib>
#include <ctime>
#include <immintrin.h> // AVX2 intrinsicsusing namespace std;
// g++ -O3 -mavx2 -std=c++17 test_strlen.cpp -o test_strlen// g++ -O3 -mavx2 -std=c++17 -S test_strlen.cpp -o test_strlen.s// 普通逐字节扫描版本
size_t strlen_naive(const char *s) {size_t i = 0;while (1) {if (s[i] != '\0') { i++;} else {break; // 找到 '\0',退出循环}}return i;
}// 循环展开版本
size_t strlen_unrolled(const char *s) {const char *p = s;while (1) {if (!p[0]) return p - s;if (!p[1]) return p - s + 1;if (!p[2]) return p - s + 2;if (!p[3]) return p - s + 3;p += 4;}
}// SIMD + 循环展开 (AVX2, 一次检查32个字节)
size_t strlen_simd_avx2(const char *s) {const char *p = s;__m256i zero = _mm256_setzero_si256(); // 初始化一个 256位的全0向量while (1) {__m256i chunk = _mm256_loadu_si256((const __m256i*)p); //一次取256位32字节的数据__m256i cmp = _mm256_cmpeq_epi8(chunk, zero); // 逐个字节比较是否为0,如果有0,cmp的对应字节为0XFFunsigned mask = _mm256_movemask_epi8(cmp); // 把32字节的cmp每个字节的最高位拿出来拼接位32位maskif (mask != 0) {// 找到第一个 '\0' 的位置int offset = __builtin_ctz(mask); // count trailing zerosreturn (p - s) + offset;}// 提前判断是否快到末尾,地址低12位表示页内偏移,p+32的页内偏移小于32,说明是到下一页了。if ((((uintptr_t)(p + 32)) & 0xFFF) < 32) {break; // 可能跨越页面,安全起见,退出}p += 32;}// 用标量方法处理最后不足 32 字节while (*p != '\0') {p++;}return p - s;
}// 测试函数
template<typename F>
long long benchmark(const string &name, F func, const vector<string> &data, int repeat) {auto start = chrono::high_resolution_clock::now();size_t total = 0;for (int r = 0; r < repeat; r++) {for (auto &s : data) {total += func(s.c_str()); // 调用函数}}auto end = chrono::high_resolution_clock::now();chrono::duration<double, milli> dur = end - start;cout << name << " => time: " << dur.count() << " ms, CheckSum=" << total << endl;return (long long)dur.count();
}int main() {srand((unsigned)time(nullptr));// 生成测试数据const int N = 100000; // 字符串数量const int LEN = 200; // 每个字符串的长度vector<string> data;data.reserve(N);for (int i = 0; i < N; i++) {string s(LEN, 'a');s[rand() % LEN] = '\0'; // 在随机位置插入 '\0'data.push_back(s);}int repeat = 10;cout << "Benchmark start...\n";benchmark("Naive", strlen_naive, data, repeat);benchmark("Unrolled", strlen_unrolled, data, repeat);benchmark("SIMD+Unrolled(AVX2)", strlen_simd_avx2, data, repeat);return 0;
}
案例二:循环合并
1 原始代码
void two_loops(int *a, int *b, size_t n) {for (size_t i = 0; i < n; i++) a[i] += 1;for (size_t i = 0; i < n; i++) b[i] += 2;
}
2 合并后
void fused_loop(int *a, int *b, size_t n) {for (size_t i = 0; i < n; i++) {a[i] += 1;b[i] += 2;}
}
收益:
- 循环控制减半
- 若
a
和b
访问模式相关(或能并行预取),可改善缓存占用 - 更容易让编译器做指令调度与向量化
3 实测对比
O2 优化下,合并循环有 15% 左右的提升, O3优化下性能接近。
但是大部分工程是使用更安全的O2优化,这里的合并循环还是有意义的。
ckun@node-1:~/ws/high$ g++ -O2 loopf.cpp -o loop
ckun@node-1:~/ws/high$ ./loop
two_loops : 80.4236 ms
fused_loop: 67.3819 ms
checksum: 6
ckun@node-1:~/ws/high$ g++ -O3 loopf.cpp -o loop
ckun@node-1:~/ws/high$ ./loop
two_loops : 56.4573 ms
fused_loop: 54.0639 ms
注意不能合并的情形:
- 存在数据依赖(第二个循环依赖第一个结果)
- 会破坏局部性(例如第一段已经超出缓存,合并后反而导致工作集增大)
4 测试代码
#include <bits/stdc++.h>
using namespace std;constexpr size_t N = 100'000'000; // 大数组,超过 L3 cache// ---------- 两次循环 ----------
void two_loops(int *a, int *b, size_t n) {for (size_t i = 0; i < n; i++) a[i] += 1;for (size_t i = 0; i < n; i++) b[i] += 2;
}// ---------- 合并循环 ----------
void fused_loop(int *a, int *b, size_t n) {for (size_t i = 0; i < n; i++) {a[i] += 1;b[i] += 2;}
}int main() {vector<int> a(N, 1), b(N, 2);// ---------- 两次循环 ----------auto t1 = chrono::high_resolution_clock::now();two_loops(a.data(), b.data(), N);auto t2 = chrono::high_resolution_clock::now();cout << "two_loops : "<< chrono::duration<double, milli>(t2 - t1).count()<< " ms\n";// 重置数组fill(a.begin(), a.end(), 1);fill(b.begin(), b.end(), 2);// ---------- 合并循环 ----------auto t3 = chrono::high_resolution_clock::now();fused_loop(a.data(), b.data(), N);auto t4 = chrono::high_resolution_clock::now();cout << "fused_loop: "<< chrono::duration<double, milli>(t4 - t3).count()<< " ms\n";// 验证结果cout << "checksum: " << a[0] + b[0] << "\n";return 0;
}
案例三:分支去除
1 有分支版本
对一个数据包判断IP version的示例。
uint64_t process_with_branch(const std::vector<Mbuf>& pkts) {uint64_t sum = 0;for (auto &m : pkts) {if (m.ip_version == 4) {sum += m.payload * 2;} else {sum += m.payload * 3;}}return sum;
}
在数据模式稳定(例如 95% IPv4)时,分支预测非常准确 → 成本低。
在随机(50/50)模式下,预测失败率高 → 每次失败触发 pipeline flush(十几~二十几个 cycles 损耗)。
2 无分支
使用三目运算符的方式选择执行路径,编译器通常会把这个三目运算符优化成类似于 条件 move(cmov
) 或者 位掩码运算(blend)
uint64_t process_branchless(const std::vector<Mbuf>& pkts) {uint64_t sum = 0;for (auto &m : pkts) {sum += (m.ip_version == 4 ? (m.payload * 2ULL) : (m.payload * 3ULL));}return sum;
}
可能编译为:
- 计算两个路径结果
- 用
cmov
或按位掩码选择 - 避免跳转指令 → 消除预测风险
cmov
:如果条件满足,则把源寄存器的值移动到目标寄存器,否则不变。整个过程没有跳转指令(jmp),CPU流水线不会因为分支预测失败而被刷新。
blend
:在 SIMD 指令中,根据掩码选择每个元素来自哪一个向量,同样没有分支跳转。
3 实测对比
=== mode: predictable(95%) ===
branch : 55.9768 ms, sum=3358339556177
branchless : 50.3089 ms, sum=3358339556177=== mode: random(50%) ===
branch : 168.193 ms, sum=4095912943114
branchless : 50.331 ms, sum=4095912943114
经验:
数据分布 | 建议 |
---|---|
极不均匀(>90/10) | 传统分支可接受 |
均匀或不可预测 | 用 branchless(计算多一点换稳定低延迟) |
反例:
- 分支路径计算成本很大,分支较少时查表法反而没有if-else来得更快。
- 引入溢出 / side effect(所有路径都需要能安全执行)
4 测试代码
#include <iostream>
#include <vector>
#include <chrono>
#include <random>
#include <immintrin.h> // for SIMD intrinsics if needed// g++ -O3 -march=native -o dpdk_branch dpdk_branch.cpp// 模拟 mbuf
struct Mbuf {uint8_t ip_version; // 4 = IPv4, 6 = IPv6uint64_t payload; // 模拟负载
};// 模拟批量收包
constexpr int BURST = 32;
constexpr size_t N = 50'000'000;using Clock = std::chrono::high_resolution_clock;// 分支式版本
uint64_t process_with_branch(const std::vector<Mbuf>& pkts) {uint64_t sum = 0;for (auto &m : pkts) {if (m.ip_version == 4) { // IPv4 处理sum += m.payload * 2;} else { // IPv6 处理sum += m.payload * 3;}}return sum;
}// 分支消除版本(branchless)
uint64_t process_branchless(const std::vector<Mbuf>& pkts) {uint64_t sum = 0;for (auto &m : pkts) {sum += m.ip_version == 4? (m.payload * 2ULL) : (m.payload * 3ULL);}return sum;
}int main() {std::vector<Mbuf> pkts(N);std::mt19937_64 rng(42);// 生成 3 种模式的数据:predictable / random / alternatingauto run_mode = [&](std::string name, auto fill_func) {fill_func(pkts);auto t1 = Clock::now();auto s1 = process_with_branch(pkts);auto t2 = Clock::now();auto s2 = process_branchless(pkts);auto t3 = Clock::now();std::chrono::duration<double, std::milli> d1 = t2 - t1;std::chrono::duration<double, std::milli> d2 = t3 - t2;std::cout << "=== mode: " << name << " ===\n";std::cout << "branch : " << d1.count() << " ms, sum=" << s1 << "\n";std::cout << "branchless : " << d2.count() << " ms, sum=" << s2 << "\n\n";};// 95% IPv4 (predictable)run_mode("predictable(95%)", [&](auto &v) {std::uniform_real_distribution<> dist(0.0, 1.0);for (auto &m : v) {m.ip_version = (dist(rng) < 0.95) ? 4 : 6;m.payload = rng() & 0xFFFF;}});// 50% 随机 (worst case)run_mode("random(50%)", [&](auto &v) {std::uniform_int_distribution<int> dist(0, 1);for (auto &m : v) {m.ip_version = dist(rng) ? 4 : 6;m.payload = rng() & 0xFFFF;}});// // 交替 (4,6,4,6…)// run_mode("alternating", [&](auto &v) {// for (size_t i = 0; i < v.size(); i++) {// v[i].ip_version = (i & 1) ? 6 : 4;// v[i].payload = rng() & 0xFFFF;// }// });return 0;
}
案例四:数据预取(Prefetch)
1 原理
内存访问延迟巨大(数百 cycles)。CPU 通过:
- 硬件预取器(基于顺序/Stride)
- Out-of-Order 执行 来隐藏部分延迟。
当访问模式是“不可推断的指针链 / 间接寻址”时,硬件预取失效 → 手动预取有价值。
2 手动预取示例
static inline void rte_prefetch0(const volatile void *p) {asm volatile("prefetcht0 %[p]" :: [p] "m" (*(const volatile char*)p));
}uint64_t process_all(Node **nodes, size_t N) {constexpr size_t PREFETCH_OFFSET = 8;uint64_t sum = 0;for (size_t i = 0; i < N; i++) {if (i + PREFETCH_OFFSET < N) {rte_prefetch0(nodes[i + PREFETCH_OFFSET]->pkt);rte_prefetch0(nodes[i + PREFETCH_OFFSET]->pkt->payload);}sum += process_packet(nodes[i]->pkt);}return sum;
}
核心:
- 访问第 i 个元素时,预取 i+K
- 预取距离(Prefetch Distance, PD) ≈ memory_latency / per_iteration_compute_time
- 若 PD 太小:仍然 miss
- 若 PD 太大:被替换掉 / 竞争 cache
3 实测对比
ckun@node-1:~/ws/high$ ./prefetch_yes
manual prefetch : 29.7191 ms, sum=42916623131
ckun@node-1:~/ws/high$ ./prefetch_no
no prefetch : 37.6579 ms, sum=42916623131
4 perf 统计
指标 | 有预取 | 无预取 | 解释 |
---|---|---|---|
LLC-loads | 534,232 | 1,631,089 | 访问 L3 更少 |
LLC-load-misses | 233,113 | 639,220 | 更少走到内存 |
dTLB-load-misses | 207,386 | 369,418 | 访问更“前置”+缓存复用 |
perf详细输出
ckun@node-1:~/ws/high$ sudo perf stat -e cycles,instructions,branches,branch-misses,L1-dcache-loads,L1-dcache-load-misses,LLC-loads,LLC-load-misses,dTLB-loads,dTLB-load-misses ./prefetch_yes
manual prefetch : 30.1557 ms, sum=42916623131Performance counter stats for './prefetch_yes':4,099,670,535 cycles (39.80%)5,204,902,307 instructions # 1.27 insn per cycle (49.83%)1,192,741,921 branches (49.82%)2,509,612 branch-misses # 0.21% of all branches (49.83%)1,198,671,483 L1-dcache-loads (50.12%)10,786,191 L1-dcache-load-misses # 0.90% of all L1-dcache accesses (50.17%)534,232 LLC-loads (40.13%)233,113 LLC-load-misses # 43.64% of all LL-cache accesses (40.14%)1,256,545,559 dTLB-loads (40.14%)207,386 dTLB-load-misses # 0.02% of all dTLB cache accesses (39.84%)0.877323887 seconds time elapsed0.821256000 seconds user0.056085000 seconds sys
无预取
ckun@node-1:~/ws/high$ sudo perf stat -e cycles,instructions,branches,branch-misses,L1-dcache-loads,L1-dcache-load-misses,LLC-loads,LLC-load-misses,dTLB-loads,dTLB-load-misses ./prefetch_no
no prefetch : 37.2121 ms, sum=42916623131Performance counter stats for './prefetch_no':4,126,748,433 cycles (39.16%)5,135,334,207 instructions # 1.24 insn per cycle (49.30%)1,192,210,925 branches (49.73%)2,503,578 branch-misses # 0.21% of all branches (50.19%)1,211,017,835 L1-dcache-loads (50.65%)11,516,844 L1-dcache-load-misses # 0.95% of all L1-dcache accesses (50.70%)1,631,089 LLC-loads (40.55%)639,220 LLC-load-misses # 39.19% of all LL-cache accesses (40.13%)1,242,669,783 dTLB-loads (39.67%)369,418 dTLB-load-misses # 0.03% of all dTLB cache accesses (39.21%)0.868179350 seconds time elapsed0.804203000 seconds user0.064016000 seconds sys
5 适用场景
- 指针数组指向分散对象
- 链表 / B+树 / LSM-tree 节点扫描
- Packet buffer(网络包处理)
不适用:
- 线性数组顺序遍历(硬件已做得很好)
- Cache 尺寸本就能容纳工作集
- 访问间隔极不规律(预取命中率低)
6 测试代码
//#include <rte_prefetch.h>
#include <vector>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <random>
#include <cstdint>static inline void rte_prefetch0(const volatile void *p)
{asm volatile ("prefetcht0 %[p]" : : [p] "m" (*(const volatile char *)p));
}struct Packet {uint64_t header;uint8_t payload[1024]; // 扩大 payload
};// 链表节点,用于打乱访问顺序
struct Node {Packet* pkt;Node* next;
};// 模拟包处理函数
inline uint64_t process_packet(Packet* pkt) {uint64_t sum = pkt->header;for (int j = 0; j < 256; j++)sum += pkt->payload[j];return sum;
}
const int PREFETCH_OFFSET = 8;int test( std::vector<Node> nodes, int N)
{// ---------- 不预取 ----------// auto start1 = std::chrono::high_resolution_clock::now();// uint64_t sum1 = 0;// for (size_t i = 0; i < N; i++)// sum1 += process_packet(nodes[i].pkt);// auto end1 = std::chrono::high_resolution_clock::now();// ---------- 手动预取 ----------auto start2 = std::chrono::high_resolution_clock::now();uint64_t sum2 = 0;for (size_t i = 0; i < N; i++) {if (i + PREFETCH_OFFSET < N) {rte_prefetch0(nodes[i + PREFETCH_OFFSET].pkt);rte_prefetch0(nodes[i + PREFETCH_OFFSET].pkt->payload);}sum2 += process_packet(nodes[i].pkt);}auto end2 = std::chrono::high_resolution_clock::now();// std::cout << "no prefetch : "// << std::chrono::duration<double, std::milli>(end1 - start1).count()// << " ms, sum=" << sum1 << "\n";std::cout << "manual prefetch : "<< std::chrono::duration<double, std::milli>(end2 - start2).count()<< " ms, sum=" << sum2 << "\n";return 0;
}int main() {const size_t N = 1 << 18; // 256K packets// 初始化 Packetstd::vector<Packet> pkts(N);for (size_t i = 0; i < N; i++) {pkts[i].header = i;for (int j = 0; j < 256; j++)pkts[i].payload[j] = rand() % 256;}// 创建节点并打乱顺序std::vector<Node> nodes(N);for (size_t i = 0; i < N; i++)nodes[i].pkt = &pkts[i];// 打乱顺序std::mt19937 rng(12345);std::shuffle(nodes.begin(), nodes.end(), rng);test(nodes, N);return 0;
}