在传统开发中,总会有一种任务,它们一般无状态,独立运行并且耗时,于是我们不得不在将它们托付给其他进程进行处理。
但额外维护一个专门处理任务的程序麻烦且奢侈,它们并不总是在工作,反而需要你时刻关心它们空转时是不是占据了太多的服务器资源,或者是不是能够正确接受任务并处理任务。
至少在我的上一家公司里,一个核心的Django程序,附带启动了接近二十个不同的Celery进程,它们时不时的吞噬CPU和内存,令人头疼。虽然我认为是设计原因,但额外维持进程总是痛苦的。
阿里云的OSS和Serverless Function,给了我一个灵感。用它们组成一个分布式的任务处理系统岂不是美哉?(其实主要是因为我没钱,而这两都有很高的免费额度)
介绍
OSS(对象存储)
阿里云的对象存储服务,不仅能拿来存图片,还能存二进制数据(废话),是一个缺少功能的k-v数据库。
用它存任务队列,经济实惠,而且耐操。以阿里云的技术,OSS的读写性能足够供应绝大部分的业务需求。
Serverless Function
阿里云的无服务函数,提供了一个OSS触发器。当触发器所指定的OSS中被新建了一个对象,而对象的Key能匹配到此触发器,则会调用这个无服务函数。
使用它作为消费者,就不再需要考虑消费者集群是否存活,是否正常运行的问题。大大减少了运维成本。
并且让我这种穷人很心动的一点是无服务函数只有被调用才会计费,每个月有百万次的免费调用额度。一般中小型企业的用量也达不到百万次,对于个人开发者更是等于不要钱。
实践代码
Talk is cheap, show me the code.
下面以一个最简单的发送邮件为例。
预备步骤
首先肯定得把无服务函数与OSS服务开启,然后创建对应的两个RAM用户,分别拥有无服务函数的全部权限和OSS的全部权限。
然后建立一个OSS bucket,用于任务队列的读写。
编写代码
使用阿里云的fun工具,首先编写一个template.yml
用于一键部署。
ROSTemplateFormatVersion: '2015-09-01' Transform: 'Aliyun::Serverless-2018-04-03' Resources: SendMail: Type: 'Aliyun::Serverless::Function' Properties: Initializer: sendmail.init InitializationTimeout: 10 Handler: sendmail.handler Timeout: 60 Runtime: python3 CodeUri: './' EnvironmentVariables: OSS_BUCKET: 'your bucket name' OSS_KEYID: '' OSS_SECRET: '' OSS_LINK: 'https://oss-cn-hongkong.aliyuncs.com' EMAIL_SERVER_HOST: 'smtp server' EMAIL_SERVER_PORT: port EMAIL_USERNAME: 'your email username' EMAIL_PASSWORD: 'your email password' Events: SendMail: Type: OSS Properties: BucketName: your bucket name Events: - oss:ObjectCreated:* Filter: Key: Prefix: mail/ Suffix: .task
配置都是什么意思,不一一解释。关键的配置在于Events
。
这里配置了SendMail这个无服务函数监听一个OSS bucket的创建事件,并且Filter
中指定,只有Key是以mail/
开头,.task
结束的对象被创建时,才会触发此函数。
再写一个sendmail.py
作为无服务函数进行部署
import os import json import typing import pickle import hashlib import smtplib import logging from email.mime.text import MIMEText from email.header import Header import oss2 OSS_BUCKET = os.environ.get("OSS_BUCKET") OSS_KEYID = os.environ.get("OSS_KEYID") OSS_SECRET = os.environ.get("OSS_SECRET") OSS_LINK = os.environ.get("OSS_LINK") EMAIL_SERVER_HOST = os.environ.get("EMAIL_SERVER_HOST") EMAIL_SERVER_PORT = int(os.environ.get("EMAIL_SERVER_PORT")) EMAIL_USERNAME = os.environ.get("EMAIL_USERNAME") EMAIL_PASSWORD = os.environ.get("EMAIL_PASSWORD") smtpObj = None def sendmail(nickname: str, to_email: str, subject: str, content: str) -> typing.Tuple[bool, typing.Optional[str]]: """发送邮件通知""" global smtpObj message = MIMEText(content, 'html', 'utf-8') message['From'] = Header(f"Trotter<{EMAIL_USERNAME}>", 'utf-8') message['To'] = Header(f'{nickname}<{to_email}>', 'utf-8') message['Subject'] = Header(subject, 'utf-8') try: smtpObj.sendmail(EMAIL_USERNAME, to_email, message.as_string()) return True, None except smtplib.SMTPException as e: return False, str(e) def init(context): global smtpObj smtpObj = smtplib.SMTP(EMAIL_SERVER_HOST, EMAIL_SERVER_PORT) smtpObj.starttls() smtpObj.login(EMAIL_USERNAME, EMAIL_PASSWORD) def handler(event, context): evt = json.loads(event) bucket = oss2.Bucket( oss2.Auth(OSS_KEYID, OSS_SECRET), OSS_LINK, OSS_BUCKET ) object_name = evt['events'][0]['oss']['object']['key'] remote_stream = bucket.get_object(object_name) remote_stream = remote_stream.read() task = pickle.loads(remote_stream) res, err = sendmail(**task) if err: logging.error(err) if res: bucket.delete_object(object_name)
由于没有使用任何需要安装的第三方库(oss2
是阿里无服务函数Python3 runtime自带的),所以不需要去管什么依赖安装。
最后fun deploy
一键部署上去,发送邮件的任务系统就弄完了。可能会有点让人诧异,但对于这种无状态的任务来说,无服务函数一键部署,就是这么简单。
那么接下来看看如何调用它去发送邮件。
import oss2 bucket = oss2.Bucket( oss2.Auth(OSS_KEYID, OSS_SECRET), OSS_LINK, OSS_BUCKET ) bucket.put_object(f'mail/test.task', pickle.dumps({ "nickname": 'Testing', "to_email": Email Address, "subject": "测试邮件", "content": "仅仅是测试" }))
如上代码在项目中使用时,应当封装成一个函数。