RxJS 核心操作符详细用法示例
1. Observable 详细用法
Observable 是 RxJS 的核心概念,代表一个可观察的数据流。
创建和订阅 Observable
import { Observable } from "rxjs";// 1. 创建Observable
const myObservable = new Observable(subscriber => {// 发出三个值subscriber.next('第一个值');subscriber.next('第二个值');subscriber.next('第三个值');// 模拟异步操作setTimeout(() => {subscriber.next('异步值');subscriber.complete(); // 完成流}, 1000);// 可选的清理逻辑return () => {console.log('Observable被取消订阅');};
});// 2. 订阅Observable
const subscription = myObservable.subscribe({next: value => console.log('收到值:', value),error: err => console.error('发生错误:', err),complete: () => console.log('流已完成')
});// 3. 取消订阅 (通常在组件销毁时调用)
setTimeout(() => {subscription.unsubscribe();
}, 2000);/* 输出顺序:
收到值: 第一个值
收到值: 第二个值
收到值: 第三个值
(等待1秒)
收到值: 异步值
流已完成
(再等待1秒)
Observable被取消订阅
*/
2. of 操作符详细用法
of
用于创建一个会立即发出给定参数的 Observable。
基本示例
import { of } from "rxjs";// 发出固定值
of('苹果', '香蕉', '橙子').subscribe({next: fruit => console.log('水果:', fruit),complete: () => console.log('水果列表结束')
});/* 输出:
水果: 苹果
水果: 香蕉
水果: 橙子
水果列表结束
*/// 发出不同类型的数据
of('字符串',123,true,{name: 'Alice'},[1, 2, 3],function hello() { return 'world'; }
).subscribe(val => console.log('收到的值:', val));// 实际应用:模拟API返回
function mockApiCall() {return of({id: 1, name: '用户1'});
}mockApiCall().subscribe(user => {console.log('用户数据:', user);
});
3. from 操作符详细用法
from
可以将多种数据类型转换为 Observable。
各种来源的转换
import { from } from "rxjs";// 1. 从数组创建
from([10, 20, 30]).subscribe(num => console.log('数字:', num));// 2. 从Promise创建
const promise = fetch('https://api.example.com/data').then(response => response.json());from(promise).subscribe(data => {console.log('API数据:', data);
});// 3. 从字符串创建 (每个字符作为单独的值)
from('Hello').subscribe(char => console.log(char));
// 输出: H, e, l, l, o// 4. 从Map或Set创建
const myMap = new Map();
myMap.set('name', 'Alice');
myMap.set('age', 25);from(myMap).subscribe(entry => {console.log('Map条目:', entry);// 输出: ['name', 'Alice'], ['age', 25]
});// 5. 实际应用:批量处理数组
const userIds = [1, 2, 3, 4];from(userIds).subscribe(id => {console.log('处理用户ID:', id);// 这里可以调用API获取每个用户的详细信息
});
4. forkJoin 操作符详细用法
forkJoin
用于并行执行多个 Observable,等待它们全部完成。
完整示例
import { forkJoin, of, from, throwError } from "rxjs";
import { delay, catchError } from "rxjs/operators";// 模拟API函数
function getUser(id) {return of({ id, name: `用户${id}` }).pipe(delay(1000));
}function getUserPosts(userId) {const posts = [{ id: 1, title: '帖子1' },{ id: 2, title: '帖子2' }];return of(posts).pipe(delay(1500));
}function getUserComments(userId) {return from(fetch(`https://api.example.com/users/${userId}/comments`));
}// 1. 基本用法
forkJoin([getUser(1),getUserPosts(1),getUserComments(1).pipe(catchError(error => of(`获取评论失败: ${error.message}`)))
]).subscribe({next: ([user, posts, comments]) => {console.log('用户:', user);console.log('帖子:', posts);console.log('评论:', comments);},error: err => console.error('整体失败:', err),complete: () => console.log('所有请求完成')
});// 2. 对象形式更清晰
forkJoin({user: getUser(1),posts: getUserPosts(1),comments: getUserComments(1).pipe(catchError(error => of([])) // 错误时返回空数组)
}).subscribe({next: result => {console.log('整合结果:', result);// 结构: { user: {...}, posts: [...], comments: [...] }}
});// 3. 错误处理演示
forkJoin({success: of('成功'),failure: throwError(new Error('出错了'))
}).pipe(catchError(error => {console.log('捕获到错误:', error);return of({ success: null, failure: error.message });})
).subscribe(result => {console.log('最终结果:', result);
});// 4. 实际应用:并行请求多个API
function loadDashboardData() {return forkJoin({user: getUser(1),notifications: from(fetch('/api/notifications')),settings: from(fetch('/api/settings'))});
}loadDashboardData().subscribe(data => {console.log('仪表盘数据:', data);// 更新UI...
});
综合实战示例
import { forkJoin, from, of } from "rxjs";
import { map, mergeMap, catchError } from "rxjs/operators";// 模拟API服务
class ApiService {static getUsers() {const users = [{ id: 1, name: 'Alice' },{ id: 2, name: 'Bob' }];return of(users).pipe(delay(500));}static getUserDetails(userId) {const details = {1: { age: 25, email: 'alice@example.com' },2: { age: 30, email: 'bob@example.com' }};return of(details[userId]).pipe(delay(300));}static getUserPosts(userId) {const posts = {1: [{ id: 101, title: 'Alice的第一篇帖子' }],2: [{ id: 201, title: 'Bob的帖子' }, { id: 202, title: 'Bob的另一篇帖子' }]};return of(posts[userId] || []).pipe(delay(700));}
}// 1. 获取所有用户及其详细信息和帖子
ApiService.getUsers().pipe(mergeMap(users => {// 为每个用户创建请求数组const userRequests = users.map(user => forkJoin({details: ApiService.getUserDetails(user.id),posts: ApiService.getUserPosts(user.id)}).pipe(map(data => ({ ...user, ...data }))));// 并行执行所有用户请求return forkJoin(userRequests);})
).subscribe({next: completeUsers => {console.log('完整用户数据:', completeUsers);/* 输出:[{id: 1,name: 'Alice',details: { age: 25, email: 'alice@example.com' },posts: [{ id: 101, title: 'Alice的第一篇帖子' }]},{id: 2,name: 'Bob',details: { age: 30, email: 'bob@example.com' },posts: [{...}, {...}]}]*/},error: err => console.error('获取用户数据失败:', err)
});// 2. 实际应用:表单提交后并行更新多个资源
function updateResources(userData, postsData, settingsData) {return forkJoin({user: from(fetch('/api/user', {method: 'PUT',body: JSON.stringify(userData)})),posts: from(fetch('/api/posts', {method: 'POST',body: JSON.stringify(postsData)})),settings: from(fetch('/api/settings', {method: 'PATCH',body: JSON.stringify(settingsData)}))}).pipe(map(responses => ({user: responses.user.json(),posts: responses.posts.json(),settings: responses.settings.json()})));
}// 使用示例
updateResources({ name: '新名字' },[{ title: '新帖子' }],{ theme: 'dark' }
).subscribe({next: results => {console.log('所有资源更新成功:', results);},error: err => {console.error('更新失败:', err);// 显示错误提示}
});
这些示例展示了 RxJS 操作符在实际开发中的典型用法。关键点:
Observable
是基础,代表数据流of
用于创建简单的同步流from
用于从各种数据源创建流forkJoin
用于并行执行多个 Observable 并合并结果