Amazon S3导入Salesforce对象的ETL设计和导入状态日志管理
使用Salesforce提供的Bulk API将Amazon S3文件导入对应的Salesforce表,有四个不同Salesforce环境,dev、qa、uat和prod,对应不同的Salesforce的实例,AWS上设计ETL,将AWS S3文件导入制定配置环境的Salesforce表,导入成功或者失败的记录到不同的两个目录下,都写入到S3上面另一个bucket的目录下,目录名包括Saleforce表的对象名、Run ID和环境名(dev、qa、uat或prod),再写一段PySpark代码,读入所有日志,覆盖写入到AWS EMR对应Salesforce对象的Hive表中,表中除了包含导入数据的字段,还包含成功或失败的状态、环境名、Run ID和对应日志目录的创建时间。
技术栈设计
- 数据存储:
- Amazon S3(原始数据/日志存储)
- Hive on AWS EMR(结构化数据仓库)
- Salesforce集成:
- Salesforce Bulk API(大数据量操作)
- Simple-Salesforce Python库(API调用)
- 计算引擎:
- PySpark(分布式数据处理)
- 基础设施:
- AWS EMR(Spark/Hive集群)
- AWS IAM(权限控制)
- 辅助工具:
- Boto3(AWS资源操作)
- Airflow/Lambda(作业调度,可选)
实现流程
阶段1:S3 → Salesforce 数据导入
-
读取环境配置
- 从安全存储(如AWS Secrets Manager)获取四个环境的:
- Salesforce登录凭证
- 实例URL(如
xxx.my.salesforce.com
) - S3日志路径模板
- 从安全存储(如AWS Secrets Manager)获取四个环境的:
-
Bulk API处理流程
for 环境 in [dev, qa, uat, prod]: # 初始化SF连接 sf = Salesforce(instance_url=环境实例URL, ...) # 创建Bulk Job job = sf.bulk.对象名.insert(...) # 拆分数据为批量任务 batch_results = [] for batch in split_data(s3_file): batch_status = sf.bulk.job.add_batch(job, batch) batch_results.append(batch_status) # 监控作业状态 while not all_batches_done(batch_results): check_batch_statuses() sleep(30) # 收集结果 success, errors = process_results(batch_results) # 写入结果日志到S3 write_logs_to_s3( success_logs=success, error_logs=errors, env=环境, run_id=run_id, s3_bucket='log-bucket' )
-
日志存储结构
s3://log-bucket/ ├─ success/ │ ├─ Contact/run_id=20240301_1234/env=dev/created_time=202403011200/ │ ├─ Account/run_id=20240301_1235/env=qa/created_time=202403011201/ ├─ failure/ │ ├─ Contact/run_id=20240301_1234/env=dev/created_time=202403011200/
阶段2:S3日志 → Hive
PySpark处理逻辑:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, current_timestamp
def log_to_hive():
spark = SparkSession.builder.appName("SFLog2Hive").enableHiveSupport().getOrCreate()
# 动态读取所有日志分区
log_df = spark.read.format("parquet").load(
"s3://log-bucket/{success,failure}/*/*/*/"
)
# 解析路径中的元数据
path_pattern = "/(success|failure)/(\w+)/run_id=([^/]+)/env=([^/]+)/created_time=([^/]+)/"
enriched_df = log_df.withColumn("status", regexp_extract(input_file_name(), path_pattern, 1)) \
.withColumn("object_name", regexp_extract(input_file_name(), path_pattern, 2)) \
.withColumn("run_id", regexp_extract(input_file_name(), path_pattern, 3)) \
.withColumn("env", regexp_extract(input_file_name(), path_pattern, 4)) \
.withColumn("created_time", regexp_extract(input_file_name(), path_pattern, 5))
# 写入Hive(动态分区)
enriched_df.write.mode("overwrite") \
.partitionBy("object_name", "env") \
.saveAsTable("salesforce_import_logs")
关键代码实现
1. Bulk API 操作核心代码
from simple_salesforce import Salesforce, SFBulkHandler
def process_sf_import(s3_path, object_name, env_config):
sf = Salesforce(
instance_url=env_config['instance_url'],
username=env_config['user'],
password=env_config['pwd'],
security_token=env_config['token']
)
bulk = SFBulkHandler(sf)
job_id = bulk.create_insert_job(object_name, contentType='CSV')
# 从S3读取数据
s3_client.download_file(s3_path, '/tmp/data.csv')
with open('/tmp/data.csv', 'r') as f:
batch_id = bulk.add_batch(job_id, f.read())
# 监控作业状态
while bulk.get_batch_status(job_id, batch_id)['state'] not in ['Completed', 'Failed']:
time.sleep(15)
# 获取结果
success_records = bulk.get_batch_results(job_id, batch_id, 'success')
failed_records = bulk.get_batch_results(job_id, batch_id, 'failed')
return success_records, failed_records
2. S3日志写入器
import boto3
from datetime import datetime
def write_logs_to_s3(logs, env, run_id, log_type):
s3 = boto3.resource('s3')
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
key = (
f"{log_type}/"
f"object={salesforce_object}/"
f"run_id={run_id}/"
f"env={env}/"
f"created_time={timestamp}/"
"data.parquet"
)
# 转换日志为Parquet
df = pd.DataFrame(logs)
buffer = BytesIO()
df.to_parquet(buffer)
s3.Object('log-bucket', key).put(Body=buffer.getvalue())
系统优化建议
- 增量处理:在Hive表中添加
run_id
时间戳过滤,避免全量覆盖 - 错误重试:对失败的Bulk API批次实现指数退避重试机制
- 元数据缓存:使用Glue Data Catalog自动发现S3日志模式
- 安全增强:使用AWS STS AssumeRole进行跨账户访问控制
- 监控体系:集成CloudWatch监控API调用次数及ETL延迟