python尾递归优化


参考 这里

class TailRecurseException(BaseException):
    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs


def tail_call_optimized(g):
    """
    This function decorates a function with tail call
    optimization. It does this by throwing an exception
    if it is it's own grandparent, and catching such
    exceptions to fake the tail call optimization.

    This function fails if the decorated
    function recurses in a non-tail context.
    """

    def func(*args, **kwargs):
        f = sys._getframe()
        # 为什么是grandparent, 函数默认的第一层递归是父调用,
        # 对于尾递归, 不希望产生新的函数调用(即:祖父调用),
        # 所以这里抛出异常, 拿到参数, 退出被修饰函数的递归调用栈!(后面有动图分析)
        if f.f_back and f.f_back.f_back \
            and f.f_back.f_back.f_code == f.f_code:
            # 抛出异常
            raise TailRecurseException(args, kwargs)
        else:
            while 1:
                try:
                    return g(*args, **kwargs)
                except TailRecurseException as e:
                    # 捕获异常, 拿到参数, 退出被修饰函数的递归调用栈
                    args = e.args
                    kwargs = e.kwargs

    func.__doc__ = g.__doc__
    return func

测试

@tail_call_optimized
def factorial(n, acc=1):
    "calculate a factorial"
    from pudb import set_trace
    set_trace()
    if n == 0:
        return acc
    return factorial(n - 1, n + acc)

print factorial(10000)

There are comments.

  • 使用tornado实时输出日志


    import tornado
    from tornado.web import Application
    from tornado.web import RequestHandler
    from tornado.websocket import WebSocketHandler
    import os
    import json
    
    template = '''<!doctype html>
    <html lang="en">
      <head>
        <meta charset="UTF-8"/>
        <title>Document</title>
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <script src="https://cdn.bootcss.com/jquery …
  • python运行超时设置


    import signal
    
    class TimeoutError(Exception):
        pass
    
    def timeout(seconds=10, error_message="Timer expired"):
        def _timeout(func):
            def _handle_timeout(signum, frame):
                raise TimeoutError(error_message)
    
            def wrapper(*args, **kwargs):
                signal.signal(signal.SIGALRM, _handle_timeout)
                signal.alarm(seconds)
                try:
                    result = func(*args, **kwargs)
                finally:
                    signal.alarm(0)
                return result
    
            return wrapper
    
        return …
  • 为pelican增加上一篇下一篇功能


    功能实现很简单, pelican也有类似的插件实现 https://github.com/getpelican/pelican-plugins/tree/master/neighbors

    不过自己实现起来更爽不是吗? 而且类似prev_article_in_categoryprev_article_in_subcategory的功能我并不需要

    首先参考pelican的文档,基础实现

    def add_neighbors(generator):
        ....
    
    def register():
        signals.article_generator_finalized.connect(add_neighbors)
    

    然后在generator.articles列表上进行操作,为article增加previous_articlenext_article对象

    先进行排序(按时间)

    articles = generator.articles
    articles.sort(key=(lambda x …

  • tornado中session实现


    tornado中默认没有session的实现,虽然默认的 set_secure_cookie 已经足够安全了,但更安全的应该是客户端保存session_id,服务端保存对应的信息

    注:保存在redis中的经测试是可以的,保存在内存中的貌似还不行

    给出源码:

    from uuid import uuid4
    from redis import StrictRedis
    from functools import wraps
    from datetime import datetime, timedelta
    from pytz import timezone
    
    
    def singleton(cls):
        instances = {}
    
        @wraps(cls)
        def getinstance(*args, **kw):
            if cls not in instances:
                instances[cls] = cls …
  • 一个简单的数据分组算法


    Table of Contents

    之前有这么一个需求

    将数据按照时间分组,比如说每5分钟为一组,或者每30分钟为一组,开始时间如果分钟数大于等于30分钟,则设置为30,如果小于30,则设置为0(也可以设置为更复杂的逻辑)

    当然中间还有一些数据的统计,分析和计算暂时不管

    获取开始时间

    def get_start_date(start_date):
        time = datetime.fromtimestamp(int(start_date) / 1000)
        if time.minute >= 30:
            minute = 30
        else:
            minute = 0
        start_date = datetime(time.year, time.month, time.day, time.hour, minute,
                              0)
        start_date = mktime …
  • celery动态添加任务


    celery是一个基于Python的分布式调度系统,文档在这 ,最近有个需求,想要动态的添加任务而不用重启celery服务,找了一圈没找到什么好办法(也有可能是文档没看仔细),所以只能自己实现囉

    为celery动态添加任务,首先我想到的是传递一个函数进去,让某个特定任务去执行这个传递过去的函数,就像这样

    @app.task
    def execute(func, *args, **kwargs):
        return func(*args, **kwargs)
    

    很可惜,会出现这样的错误

    kombu.exceptions.EncodeError: Object of type 'function' is not JSON serializable
    

    换一种序列化方式

    @app.task(serializer='pickle')
    def execute(func, *args, **kwargs):
        return func(*args, **kwargs …

  • 基于whoosh实现的flask全文搜索插件


    flask 貌似很少全文搜索的插件,有一个 https://github.com/gyllstromk/Flask-WhooshAlchemy , 但试了几次都用不了,所以参考 Flask-WhooshAlchemy 自己写了一个

    Quickstart

    插件基于 whoosh,纯 python 编写,使用上很简单

    from flask_msearch import Search
    [...]
    search = Search()
    search.init_app(app)
    
    # models.py
    class Post(db.Model):
        __tablename__ = 'post'
        __searchable__ = ['title', 'content']
    
    # views.py …

  • himawari8图片下载改进版


    第一版下载图片后设为壁纸,四周都是黑黑的不好看, 所以结合本地壁纸将两张图片进行合成

    只要计算一下要缩减的大小与要放置的位置

    我的方法是:打开 gimp,合成两张图片,记下缩减的大小(214,214)与位置(160,160)

    每次只要运行一下

    python himawari8.py
    

    源码

    from PIL import Image, ImageOps, ImageDraw
    from io import BytesIO
    from urllib.request import Request, urlopen
    from datetime import datetime, timedelta
    import json
    
    SCALE = 2
    WIDTH = 1368
    HEIGHT = 768
    
    
    def …