当前位置: 首页 > news >正文

阿里云网站备案好了 怎么建站/泉州关键词优化软件

阿里云网站备案好了 怎么建站,泉州关键词优化软件,公司网站建设找谁做,长春网站建设工作室目录 代码分析: 类定义: 成员变量: run 方法: cancel 方法: 时间戳和水位线: 代码拓展 增加事件类型: 动态调整事件生成频率: 增加事件过滤: 增加事件聚合: 增加事件重试机制: 增加事件分区: ​​package sourceimport java.util.Calendarimport org.…

目录

代码分析: 

类定义:

成员变量:

run 方法:

cancel 方法:

时间戳和水位线:

代码拓展

增加事件类型:

动态调整事件生成频率:

增加事件过滤:

增加事件聚合:

增加事件重试机制:

增加事件分区:


 

​​package sourceimport java.util.Calendarimport org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.watermark.Watermarkimport scala.util.Random/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: source* @author: 赵嘉盟-HONOR* @data: 2025-05-19 2:20* @DESCRIPTION**/
//class ClickSource extends SourceFunction[Event]{  //串行执行
class ClickSource extends ParallelSourceFunction[Event]{ //并行执行var Running=trueoverride def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {val random = new Random()val users=Array("Mary","Alice","Bob","Cary")val urls=Array("./home","./cart","./fav","./prod?id=1","./prod?id=2","./prod?id=3")while (Running){val event=Event(users(random.nextInt(users.length)),urls(random.nextInt(urls.length)),Calendar.getInstance.getTimeInMillis)
/*      //为要发送的数据分配时间戳sourceContext.collectWithTimestamp(event,event.timestamp)//像下游直接发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp-1L))*/sourceContext.collect(event)Thread.sleep(1000)}}override def cancel(): Unit = {Running=false}
}

这段代码定义了一个自定义的 Flink 数据源 ClickSource,它实现了 ParallelSourceFunction[Event] 接口,用于生成模拟的用户点击事件流。以下是代码的详细解释:


代码分析: 

  1. 类定义:
    • ClickSource 类实现了 ParallelSourceFunction[Event] 接口,这意味着它是一个可以并行执行的 Flink 数据源。
    • Event 是一个自定义的数据类型,表示用户点击事件。
  2. 成员变量:
    • Running 是一个布尔变量,用于控制数据源的运行状态。当 Running 为 true 时,数据源会持续生成事件;当 Running 为 false 时,数据源停止生成事件。
  3. run 方法:
    • run 方法是 SourceFunction 的核心方法,用于生成数据并发送到 Flink 的数据流中。
    • random 是一个 Random 对象,用于生成随机数。
    • users 和 urls 是两个数组,分别存储了模拟的用户名和 URL。
    • 在 while 循环中,代码随机选择一个用户和一个 URL,生成一个 Event 对象,并将其发送到 Flink 的数据流中。
    • sourceContext.collect(event) 用于将生成的 Event 发送到下游。
    • Thread.sleep(1000) 用于控制事件生成的频率,这里设置为每秒生成一个事件。
  4. cancel 方法:
    • cancel 方法用于停止数据源的运行,将 Running 设置为 false
  5. 时间戳和水位线:
    • 注释掉的代码 sourceContext.collectWithTimestamp(event, event.timestamp) 和 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)) 用于为事件分配时间戳和发送水位线。水位线是 Flink 中用于处理事件时间的重要机制。

代码拓展

  1. 增加事件类型:
    • 可以扩展 Event 类,增加更多的事件类型,例如点击类型(如“浏览”、“购买”等),以便生成更复杂的事件流。
    case class Event(user: String, url: String, eventType: String, timestamp: Long)
  2. 动态调整事件生成频率:
    • 可以通过外部配置或动态参数来调整事件生成的频率,而不是硬编码 Thread.sleep(1000)
    val interval = 1000 // 可以从配置文件中读取
    Thread.sleep(interval)
  3. 增加事件过滤:
    • 可以在生成事件时增加过滤逻辑,例如只生成特定用户或特定 URL 的事件。
    if (event.user == "Mary") {sourceContext.collect(event)
    }
  4. 增加事件聚合:
    • 可以在生成事件时进行简单的聚合,例如统计每个用户的点击次数。
    val userClickCount = scala.collection.mutable.Map[String, Int]()
    userClickCount(event.user) = userClickCount.getOrElse(event.user, 0) + 1
  5. 增加事件序列化:

    • 如果需要将事件发送到 Kafka 或其他消息队列,可以增加事件的序列化逻辑。
    val serializedEvent = serialize(event)
    sourceContext.collect(serializedEvent)
  6. 增加事件重试机制:
    • 在发送事件时,可以增加重试机制,以应对网络或下游系统的故障。
    var retryCount = 0
    while (retryCount < 3) {try {sourceContext.collect(event)retryCount = 3} catch {case e: Exception =>retryCount += 1Thread.sleep(1000)}
    }
  7. 增加事件分区:
    • 如果需要将事件发送到不同的分区,可以根据事件的某些属性(如用户 ID)进行分区。
    val partition = event.user.hashCode % numPartitions
    sourceContext.collect(partition, event)

通过这些扩展,可以使 ClickSource 更加灵活和强大,适应不同的业务需求。

http://www.whsansanxincailiao.cn/news/31941012.html

相关文章:

  • 1688网站批发/google手机官网
  • 东莞网站建设 餐饮/舆情通
  • dede门户网站模板/厨师培训学校
  • 东莞模板建站哪家好/sem竞价课程
  • 婚恋网站模板下载/外贸商城建站
  • 网站投放广告怎么做/wordpress建站公司
  • 网站怎么做按钮/手机百度网盘网页版登录入口
  • 网站制作 杭州公司/优化课程
  • 西安做网站公司必达/网络营销公司排行榜
  • web网站开发里怎么切换界面/神马搜索seo优化排名
  • 昆明网站建设外包/东莞网络优化调查公司
  • 网站源码上传到空间以后怎么做/交换链接的其它叫法是
  • 做网站浏览器/网站优化seo怎么做
  • 山东嘉邦家居用品公司网站 加盟做经销商多少钱 有人做过吗/互联网营销培训平台
  • 网站群管理平台建设/合肥seo整站优化网站
  • 品牌网站建设流程图/重庆seo多少钱
  • 光谷网站建设公司/我是站长网
  • 网站集约化建设的总体情况/网站建设及推广优化
  • 网站排名不可有利就前/百度招商加盟推广
  • 群英云服务器/江阴网站优化公司
  • 网站是怎么做/肇庆seo按天收费
  • 怎么做亚马逊网站/产品推广图片
  • 外贸网站支付系统/网络优化排名培训
  • 建设网站的公司济南兴田德润o简介图片/优化什么意思
  • 建程网的工程可靠吗/南宁seo排名外包
  • 成都 做网站 模版/中国最新军事新闻最新消息
  • wordpress建设网站的方法/不受国内限制的浏览器下载
  • wordpress添加flash/郑州谷歌优化外包
  • 网站底部设计/店铺运营
  • 门户网站推广/上海全网营销推广