首页域名资讯 正文

定时消息通知

2024-11-04 4 0条评论

1.1. 背景

系统的有些业务时需要定时发消息通知。但是这些消息又不是有规律可循的。比如,商品的优惠是限时的。在之前的实现是有一个排查任务每5分钟都去去商品表中查询哪些有做活动的商品,并比较是否过了限时折扣的时间。但是类似的排程多了,就会出现在某个时候数据库的资源使用率特别高。

1.2. 解决思路

1、将参与限时活动的商品保存在另外一张表。

2、使用消息队列机制,选择限时商品的时候将商品信息和限时的时间传入消息队列。

3、创建一个定时任务。

4、当时间到了定时任务就将在限时商品表删除此商品。

1.3. 定时消息任务的实现

这边就不去操作数据库了,就演示一下要如何实现这样的定时任务。同时也不演示kafka是如何搭建的,这边就直接用起来。

生产者代码

生产者代码主要实现了将 商品信息、数据库链接信息、定时时间传入kafka中。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #!/usr/bin/env python # -*- coding:utf-8 -*-    from pykafka import KafkaClient import simplejson as json import logging import time import sys    reload ( sys ) sys . setdefaultencoding ( ‘utf-8’ ) logging . basicConfig ( ) if __name__ == ‘__main__’ :    # 可接受多个Client这是重点    client = KafkaClient ( hosts = “192.168.137.12:9092” )       # 选择一个topic    topic = client . topics [ ‘test’ ]       # 创建一个生产者    producer = topic . get_producer ( )    producer . start ( )       # 生产消息    msg_dict = {      “sleep_time” : 10 ,      “db_config” : {        “database”    : “test” ,        “host”        : “192.168.137.12” ,        “user”        : “root” ,        “password”    : “root”      } ,      “table”      : “msg” ,      “msg”        : “Hello World”    }    msg = json . dumps ( msg_dict )    producer . produce ( msg )    producer . stop ( )

消费者代码

消费者代码中主要是实现了将接收的消息放入定时任务中(timer)。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #!/usr/bin/env python # -*- coding:utf-8 -*-    from pykafka . common import OffsetType from pykafka import KafkaClient from threading import Timer import simplejson as json import logging import sys    reload ( sys ) sys . setdefaultencoding ( ‘utf-8’ ) logging . basicConfig ( ) def func ( msg ) :      定时执行的任务   ‘    print msg    if __name__ == ‘__main__’ :    # 可接受多个Client这是重点    client = KafkaClient ( hosts = “192.168.137.12:9092” )       # 查看所有topic    client . topics       # 选择一个topic    topic = client . topics [ ‘test’ ]       # 使用这种一个topic只能允许一个consumer_group消费    balanced_consumer = topic . get_balanced_consumer (      consumer_group = ‘test_group1’ ,      auto_commit_enable = True ,      zookeeper_connect = ‘trustauth.cn:2181’    )       for message in balanced_consumer : if message is not None :    # 创建定时任务        timer = Timer ( 10 , func , args = [ message . value ] )        timer . start ( )

1.4. 补充

之前我说过消息信息需要存入数据库,这边的原因是主要是怕这个消费者程序奔溃重启是还能恢复。还有其实我们应该使用zookeeper开发分布式程序。当一个消费者程序崩溃了另外一个需要马上接进来(有兴趣的可以去研究一下kazoo,并实现分布式程序和leader选举)。

 

 

文章转载来自:trustauth.cn

文章版权及转载声明

本文作者:亿网 网址:https://www.edns.com/ask/post/163393.html 发布于 2024-11-04
文章转载或复制请以超链接形式并注明出处。