当前位置: 首页 > news >正文

产品做国外网站有哪些/网站排行榜前十名

产品做国外网站有哪些,网站排行榜前十名,微信公众号怎么发布作品,网站怎么做认证吗📌Python自定义消费Kafka至HDFS 当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --&…

📌Python自定义消费Kafka至HDFS

当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --> HDFS】


from kafka import KafkaConsumer
import json
from datetime import datetime
import time
from hdfs import InsecureClient##### 自定义消费KAFKA 数据 #####def consume_kafka_topic(bootstrap_servers, topic_name):"""消费指定Kafka Topic并处理消息(TXT格式)参数:bootstrap_servers (str): Kafka集群地址topic_name (str): 要消费的Topic名称"""# 创建HDFS客户端(需替换实际Hadoop地址和用户名)hdfs_client = InsecureClient('http://xxxxxxxxx:9870/', user='xxx')# 定义时间范围(毫秒级时间戳)start_time = int(datetime(2025, 3, 1).timestamp() * 1000)end_time = int(datetime(2025, 3, 31).timestamp() * 1000)# 创建Kafka消费者consumer = KafkaConsumer(topic_name,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',enable_auto_commit=True,group_id='test-group4',value_deserializer=lambda x: json.loads(x.decode('utf-8')))buffer = []count = 0file_num = 1  # 文件序号print(f"开始消费Topic: {topic_name},时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")try:for message in consumer:# 提取消息时间(假设消息中的time字段已经是毫秒时间戳)msg_time = message.value.get('time', 0)# print(f"msg_time:{msg_time}")# print(f"msg_time:{msg_time}")# 时间过滤(注意:原始代码中的+28800可能需要根据实际情况调整时区)if start_time <= msg_time < end_time:# 将整个JSON对象转为字符串json_str = json.dumps(message.value, ensure_ascii=False) + "\n"  # 添加换行符buffer.append(json_str)count += 1# 达到30万条时写入文件if count >= 300000:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入文件: hdfs://{hdfs_path} | 消息数: {count}")# 重置计数器和缓冲区buffer = []count = 0file_num += 1except KeyboardInterrupt:print("\n用户中断消费,正在保存剩余数据...")finally:# 保存剩余消息if count > 0:filename = f"kafka_data_{file_num}.txt"hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"# 写入HDFShdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)print(f"已写入剩余文件: hdfs://{hdfs_path} | 消息数: {count}")# 清理资源consumer.close()hdfs_client.close()print("消费任务完成")if __name__ == "__main__":# 配置参数KAFKA_SERVERS = "xxxxxxxx:9092,xxxxxxx:9092,xxxxxxx:9092"TARGET_TOPIC = "mqtt_drive"# 执行消费函数consume_kafka_topic(KAFKA_SERVERS, TARGET_TOPIC)

http://www.whsansanxincailiao.cn/news/30277686.html

相关文章:

  • 网站空间租赁500mb/网络推销平台有哪些
  • 计算机网站建设知识/济南优化网站的哪家好
  • 购物网站首页分成几个模块/手机seo关键词优化
  • 企业网站建设应遵守的原则/佛山关键词排名效果
  • 在线教育做网站好还是app好/seo新方法
  • 怎样做触屏版手机网站/住房和城乡建设部官网
  • 站长统计app进入网址/aso优化违法吗
  • wordpress连接小程序/讯展网站优化推广
  • logomaker在线设计生成/谷歌搜索优化
  • 男女做受网站/郑州网站优化排名
  • 南宁专业网站制作公司/做网站用什么软件好
  • 滁州建设局网站/搜索引擎优化排名技巧
  • 如何做外贸独立网站/餐饮管理和营销方案
  • 深圳网站开发建设服务公司/seo 排名 优化
  • 品牌网站定制开发/什么叫网络营销
  • 台州市建设规划局网站6/新闻今天的最新新闻
  • 江宁网站建设案例/郑州网站建设推广
  • 杭州网站程序开发公司/最近三天的新闻大事摘抄
  • 姜堰网网站/互联网营销的方式有哪些
  • 成功案例展示网站/网络服务有哪些
  • 怎样开设网站/成都新闻最新消息
  • 淘客自己的网站怎么做/搜狗整站优化
  • wordpress页面中添加小工具/百度seo网站优化 网络服务
  • 长沙圭塘网站建设公司/网站运营主要做什么
  • 有没有可以做各种字体的网站/关键词优化分析工具
  • 日本软银集团总资产/北京seo技术交流
  • 许昌正规网站优化公司/百度推广公司哪家比较靠谱
  • 南京百度网站建设/网站推广去哪家比较好
  • 网站建设方案百度文库/最近一周的重大热点新闻
  • 大学生网站建设方案/什么是互联网推广