1. 函数式编程Functional Programming
1. 高阶函数
求绝对值的函数abs()
map()函数接收两个参数,一个是函数,一个是Iterable,map将传入的函数依次作用到序列的每个元素,并把结果作为新的Iterator返回。
#有一个函数f(x)=x2,要把这个函数作用在一个list [1, 2, 3, 4, 5, 6, 7, 8, 9]上
>>> def f(x):
... return x * x
...
>>> r = map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> list(r)
[1, 4, 9, 16, 25, 36, 49, 64, 81]
reduce()把一个函数作用在一个序列[x1, x2, x3, …]上,这个函数必须接收两个参数,reduce把结果继续和序列的下一个元素做累积计算
reduce(f, [x1, x2, x3, x4]) = f(f(f(x1, x2), x3), x4)
filter()函数用于过滤序列,把传入的函数依次作用于每个元素,然后根据返回值是True还是False决定保留还是丢弃该元素
#在一个list中,删掉偶数,只保留奇数
def is_odd(n):
return n % 2 == 1
list(filter(is_odd, [1, 2, 4, 5, 6, 9, 10, 15]))
# 结果: [1, 5, 9, 15]
sorted()函数就可以对list进行排序
>>> sorted([36, 5, -12, 9, -21])
[-21, -12, 5, 9, 36]
#sorted()函数也是一个高阶函数,它还可以接收一个key函数来实现自定义的排序,例如按绝对值大小排序
>>> sorted([36, 5, -12, 9, -21], key=abs)
[5, 9, -12, -21, 36]
2. 返回函数
#如果不需要立刻求和,而是在后面的代码中,根据需要再计算怎么办?可以不返回求和的结果,而是返回求和的函数:
def lazy_sum(*args):
def sum():
ax = 0
for n in args:
ax = ax + n
return ax
return sum
#调用lazy_sum()时,返回的并不是求和结果,而是求和函数
>>> f = lazy_sum(1, 3, 5, 7, 9)
>>> f
<function lazy_sum.<locals>.sum at 0x101c6ed90>
#调用函数f时,才真正计算求和的结果:
>>> f()
25
一个函数可以返回一个计算结果,也可以返回一个函数。
返回一个函数时,牢记该函数并未执行,返回函数中不要引用任何可能会变化的变量。
3. 匿名函数
lambda x: x * x
关键字lambda表示匿名函数,冒号前面的x表示函数参数。
匿名函数有个限制,就是只能有一个表达式,不用写return,返回值就是该表达式的结果。
4. 装饰器(Decorator)
def log(func):
def wrapper(*args, **kw):
print('call %s():' % func.__name__)
return func(*args, **kw)
return wrapper
@log
def now():
print('2015-3-25')
>>> now()
call now():
2015-3-25
由于log()是一个decorator,返回一个函数,所以,原来的now()函数仍然存在,只是现在同名的now变量指向了新的函数,于是调用now()将执行新函数,即在log()函数中返回的wrapper()函数。
wrapper()函数的参数定义是(*args, **kw),因此,wrapper()函数可以接受任意参数的调用。在wrapper()函数内,首先打印日志,再紧接着调用原始函数。
5. 偏函数
functools.partial的作用就是,把一个函数的某些参数给固定住(也就是设置默认值),返回一个新的函数,调用这个新函数会更简单
>>> import functools
>>> int2 = functools.partial(int, base=2)
>>> int2('1000000')
64
>>> int2('1010101')
85
用len(classmates)来获得list的长度
2. 面向对象高级编程
1. 使用__slots__
限制实例的属性
class Student(object):
__slots__ = ('name', 'age') # 用tuple定义允许绑定的属性名称
>>> s = Student() # 创建新的实例
>>> s.name = 'Michael' # 绑定属性'name'
>>> s.age = 25 # 绑定属性'age'
>>> s.score = 99 # 绑定属性'score'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'Student' object has no attribute 'score'
2. 使用@property
@property装饰器就是负责把一个方法变成属性调用的:
class Student(object):
@property
def score(self):
return self._score
@score.setter
def score(self, value):
if not isinstance(value, int):
raise ValueError('score must be an integer!')
if value < 0 or value > 100:
raise ValueError('score must between 0 ~ 100!')
self._score = value
>>> s = Student()
>>> s.score = 60 # OK,实际转化为s.set_score(60)
>>> s.score # OK,实际转化为s.get_score()
60
>>> s.score = 9999
Traceback (most recent call last):
...
ValueError: score must between 0 ~ 100!
3. 多重继承
class Dog(Mammal, RunnableMixIn, CarnivorousMixIn):
pass
在设计类的继承关系时,通常,主线都是单一继承下来的,但是,如果需要“混入”额外的功能,通过多重继承就可以实现,这种设计通常称之为MixIn
MixIn的目的就是给一个类增加多个功能,这样,在设计类的时候,我们优先考虑通过多重继承来组合多个MixIn的功能,而不是设计多层次的复杂的继承关系。
4. 定制类
重写类自带的方法
# __str__
>>> class Student(object):
... def __init__(self, name):
... self.name = name
... def __str__(self):
... return 'Student object (name: %s)' % self.name
...
>>> print(Student('Michael'))
Student object (name: Michael)
# __iter__
# __getitem__
# __getattr__
# __call__
5. 枚举类Enum
from enum import Enum
Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'))
# 获得了Month类型的枚举类,可以直接使用Month.Jan来引用一个常量,value属性则是自动赋给成员的int常量,默认从1开始计数。
#需要更精确地控制枚举类型,可以从Enum派生出自定义类
from enum import Enum, unique
@unique
class Weekday(Enum):
Sun = 0 # Sun的value被设定为0
Mon = 1
Tue = 2
Wed = 3
Thu = 4
Fri = 5
Sat = 6
# @unique装饰器可以帮助我们检查保证没有重复值。
>>> day1 = Weekday.Mon
>>> print(day1)
Weekday.Mon
>>> print(Weekday.Tue.value)
2
6. 元类
type()运行时动态创建类
3. 错误、调试和测试
1. 错误处理
try…except…finally…的错误处理机制
#函数的定义
try:
print('try...')
r = 10 / 0
print('result:', r)
except ZeroDivisionError as e:
print('except:', e)
finally:
print('finally...')
print('END')
try...
except: division by zero
finally...
END
#认为某些代码可能会出错时,就可以用try来运行这段代码,如果执行出错,则后续代码不会继续执行,而是直接跳转至错误处理代码,
#即except语句块,执行完except后,如果有finally语句块,则执行finally语句块,至此,执行完毕
#如果没有错误发生,可以在except语句块后面加一个else,当没有错误发生时,会自动执行else语句
try:
print('try...')
r = 10 / int('2')
print('result:', r)
except ValueError as e:
print('ValueError:', e)
except ZeroDivisionError as e:
print('ZeroDivisionError:', e)
else:
print('no error!')
finally:
print('finally...')
print('END')
#logging模块可以非常容易地记录错误信息, 让程序继续执行下去
import logging
def foo(s):
return 10 / int(s)
def bar(s):
return foo(s) * 2
def main():
try:
bar('0')
except Exception as e:
logging.exception(e)
main()
print('END')
$ python3 err_logging.py
ERROR:root:division by zero
Traceback (most recent call last):
File "err_logging.py", line 13, in main
bar('0')
File "err_logging.py", line 9, in bar
return foo(s) * 2
File "err_logging.py", line 6, in foo
return 10 / int(s)
ZeroDivisionError: division by zero
END
#抛出错误
class FooError(ValueError):
pass
def foo(s):
n = int(s)
if n==0:
raise FooError('invalid value: %s' % s)
return 10 / n
foo('0')
try:
foo('0')
except ValueError as e:
print('ValueError!')
raise
#raise语句如果不带参数,就会把当前错误原样抛出
2. 调试
logging
import logging
logging.basicConfig(level=logging.INFO)
s = '0'
n = int(s)
logging.info('n = %d' % n)
print(10 / n)
$ python err.py
INFO:root:n = 0
Traceback (most recent call last):
File "err.py", line 8, in <module>
print(10 / n)
ZeroDivisionError: division by zero
# logging允许你指定记录信息的级别,有debug,info,warning,error等几个级别,当我们指定level=INFO时,
# logging.debug就不起作用了。同理,指定level=WARNING后,debug和info就不起作用了。这样一来,你可以放
# 心地输出不同级别的信息,也不用删除,最后统一控制输出哪个级别的信息。
#logging的另一个好处是通过简单的配置,一条语句可以同时输出到不同的地方,比如console和文件。
pdb
$ python -m pdb err.py
> /Users/michael/Github/learn-python3/samples/debug/err.py(2)<module>()
-> s = '0'
# 命令l来查看代码
# 命令n可以单步执行代码
# 命令p 变量名来查看变量
# 命令q结束调试
import pdb
s = '0'
n = int(s)
pdb.set_trace() # 运行到这里会自动暂停
print(10 / n)
#运行代码,程序会自动在pdb.set_trace()暂停并进入pdb调试环境
3. 单元测试
import unittest
class Student(object):
def __init__(self, name, score):
self.name = name
self.score = score
def get_grade(self):
if self.score >= 60:
return 'B'
if self.score >= 80:
return 'A'
return 'C'
class TestStudent(unittest.TestCase):
def test_80_to_100(self):
s1 = Student('Bart', 80)
s2 = Student('Lisa', 100)
self.assertEqual(s1.get_grade(), 'A')
self.assertEqual(s2.get_grade(), 'A')
def test_60_to_80(self):
s1 = Student('Bart', 60)
s2 = Student('Lisa', 79)
self.assertEqual(s1.get_grade(), 'B')
self.assertEqual(s2.get_grade(), 'B')
def test_0_to_60(self):
s1 = Student('Bart', 0)
s2 = Student('Lisa', 59)
self.assertEqual(s1.get_grade(), 'C')
self.assertEqual(s2.get_grade(), 'C')
def test_invalid(self):
s1 = Student('Bart', -1)
s2 = Student('Lisa', 101)
with self.assertRaises(ValueError):
s1.get_grade()
with self.assertRaises(ValueError):
s2.get_grade()
if __name__ == '__main__':
unittest.main()
#当做正常的python脚本运行:
$ python mydict_test.py
#在命令行通过参数-m unittest直接运行单元测试
$ python -m unittest mydict_test
.....
----------------------------------------------------------------------
Ran 5 tests in 0.000s
OK
setUp与tearDown
可以在单元测试中编写两个特殊的setUp()和tearDown()方法。这两个方法会分别在每调用一个测试方法的前后分别被执行。
setUp()和tearDown()方法有什么用呢?设想你的测试需要启动一个数据库,这时,就可以在setUp()方法中连接数据库,
在tearDown()方法中关闭数据库,这样,不必在每个测试方法中重复相同的代码:
class TestDict(unittest.TestCase):
def setUp(self):
print('setUp...')
def tearDown(self):
print('tearDown...')
4. 文档测试doctest
“文档测试”(doctest)模块可以直接提取注释中的代码并执行测试。
def abs(n):
'''
Function to get absolute value of number.
Example:
>>> abs(1)
1
>>> abs(-1)
1
>>> abs(0)
0
'''
return n if n >= 0 else (-n)
doctest严格按照Python交互式命令行的输入和输出来判断测试结果是否正确。只有测试异常的时候,可以用…表示中间一大段烦人的输出。
4. IO编程
1. 文件读写
读文件
#标示符'r'表示读
#读取二进制文件,用'rb'模式打开
with open('/path/to/file', 'r') as f:
print(f.read())
# 调用read()会一次性读取文件的全部内容,如果文件有10G,内存就爆了,所以,要保险起见,可以反复调用read(size)方法,每次
# 最多读取size个字节的内容。
# 另外,调用readline()可以每次读取一行内容,调用readlines()一次读取所有内容并按行返回list。因此,要根据需要决定怎么调用。
# 如果文件很小,read()一次性读取最方便;如果不能确定文件大小,反复调用read(size)比较保险;如果是配置文件,调用readlines()最方便:
for line in f.readlines():
print(line.strip()) # 把末尾的'\n'删掉
写文件
with open('/Users/michael/test.txt', 'w') as f:
f.write('Hello, world!')
#以'w'模式写入文件时,如果文件已存在,会直接覆盖, 'a'以追加(append)模式写入
2. StringIO和BytesIO
StringIO顾名思义就是在内存中读写str。
>>> from io import StringIO
>>> f = StringIO()
>>> f.write('hello')
5
>>> f.write(' ')
1
>>> f.write('world!')
6
>>> print(f.getvalue())
hello world!
#要读取StringIO,可以用一个str初始化StringIO,然后,像读文件一样读取:
>>> from io import StringIO
>>> f = StringIO('Hello!\nHi!\nGoodbye!')
>>> while True:
... s = f.readline()
... if s == '':
... break
... print(s.strip())
...
Hello!
Hi!
Goodbye!
StringIO操作的只能是str,如果要操作二进制数据,就需要使用BytesIO。
>>> from io import BytesIO
>>> f = BytesIO()
>>> f.write('中文'.encode('utf-8'))
6
>>> print(f.getvalue())
b'\xe4\xb8\xad\xe6\x96\x87'
>>> from io import BytesIO
>>> f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87')
>>> f.read()
2. 操作文件和目录
>>> import os
# 获取某个环境变量的值
>>> os.environ.get('PATH')
# 查看当前目录的绝对路径:
>>> os.path.abspath('.')
'/Users/michael'
# 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来:
>>> os.path.join('/Users/michael', 'testdir')
'/Users/michael/testdir'
# 然后创建一个目录:
>>> os.mkdir('/Users/michael/testdir')
# 删掉一个目录:
>>> os.rmdir('/Users/michael/testdir')
# 把两个路径合成一个时,不要直接拼字符串,而要通过os.path.join()函数,这样可以正确处理不同操作系统的路径分隔符
# 要拆分路径时,也不要直接去拆字符串,而要通过os.path.split()函数,这样可以把一个路径拆分为两部分,后一部分总是最后级别的目录或文件名
>>> os.path.split('/Users/michael/testdir/file.txt')
('/Users/michael/testdir', 'file.txt')
>>> os.path.splitext('/path/to/file.txt')
('/path/to/file', '.txt')
# 对文件重命名:
>>> os.rename('test.txt', 'test.py')
# 删掉文件:
>>> os.remove('test.py')
shutil模块提供了copyfile()的函数,在shutil模块中找到很多实用函数,它们可以看做是os模块的补充
#列出当前目录下的所有目录
>>> [x for x in os.listdir('.') if os.path.isdir(x)]
['.lein', '.local', '.m2', '.npm', '.ssh', '.Trash', '.vim', 'Applications', 'Desktop', ...]
3. 序列化
把变量从内存中变成可存储或传输的过程称之为序列化,在Python中叫pickling,在其他语言中也被称之为serialization,marshalling,flattening等等,都是一个意思。
用json格式进行序列化
5. 进程和线程
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,
所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了
1. 多进程
multiprocessing模块提供了一个Process类来代表一个进程对象
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
#创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
Pool进程池
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
# 由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
子进程
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
#在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的
#如果子进程还需要输入,则可以通过communicate()方法输入:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
进程间通信Queue、Pipes
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
2. 多线程
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,
对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块
启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行
import time, threading
# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
Lock锁
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()
# 当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
# 获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。
3. 多核CPU问题
Python启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%
Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,
任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的
线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能
交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦。
4. ThreadLocal
一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把
local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也
不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的
处理函数都可以非常方便地访问这些资源。
5. 分布式进程
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,
将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
6. 正则表达式
Python提供re模块,包含所有正则表达式的功能,强烈建议使用Python的r前缀,就不用考虑转义的问题了
test = '用户输入的字符串'
m = re.match(r'正则表达式', test)
if m:
print('ok')
else:
print('failed')
# groups进行分组
m.groups()
match()方法判断是否匹配,如果匹配成功,返回一个Match对象,否则返回None
7. 常用内建模块
1. datetime
datetime是Python处理日期和时间的标准库。
>>> from datetime import datetime
>>> now = datetime.now() # 获取当前datetime
>>> print(now)
2015-05-18 16:28:07.198690
datetime转换为timestamp
在计算机中,时间实际上是用数字表示的。我们把1970年1月1日 00:00:00 UTC+00:00时区的时刻称为epoch time,
记为0(1970年以前的时间timestamp为负数),当前时间就是相对于epoch time的秒数,称为timestamp。
>>> from datetime import datetime
>>> dt = datetime(2015, 4, 19, 12, 20) # 用指定日期时间创建datetime
>>> dt.timestamp() # 把datetime转换为timestamp
1429417200.0
# 注意Python的timestamp是一个浮点数。如果有小数位,小数位表示毫秒数。
timestamp转换为datetime
>>> from datetime import datetime
>>> t = 1429417200.0
>>> print(datetime.fromtimestamp(t))
2015-04-19 12:20:00
str转换为datetime
>>> from datetime import datetime
>>> cday = datetime.strptime('2015-6-1 18:19:59', '%Y-%m-%d %H:%M:%S')
>>> print(cday)
2015-06-01 18:19:59
datetime转换为str
>>> from datetime import datetime
>>> now = datetime.now()
>>> print(now.strftime('%a, %b %d %H:%M'))
Mon, May 05 16:28
如果要存储datetime,最佳方法是将其转换为timestamp再存储,因为timestamp的值与时区完全无关。
2. collections
collections是Python内建的一个集合模块,提供了许多有用的集合类。
namedtuple
#namedtuple是一个函数,它用来创建一个自定义的tuple对象,并且规定了tuple元素的个数,并可以用属性而不是索引来引用tuple的某个元素。
#namedtuple可以很方便地定义一种数据类型,它具备tuple的不变性,又可以根据属性来引用,使用十分方便
>>> from collections import namedtuple
>>> Point = namedtuple('Point', ['x', 'y'])
>>> p = Point(1, 2)
>>> p.x
1
>>> p.y
2
deque
# deque是为了高效实现插入和删除操作的双向列表,适合用于队列和栈
>>> from collections import deque
>>> q = deque(['a', 'b', 'c'])
>>> q.append('x')
>>> q.appendleft('y')
>>> q
deque(['y', 'a', 'b', 'c', 'x'])
defaultdict
# 使用dict时,如果引用的Key不存在,就会抛出KeyError。如果希望key不存在时,返回一个默认值,就可以用defaultdict
>>> from collections import defaultdict
>>> dd = defaultdict(lambda: 'N/A')
>>> dd['key1'] = 'abc'
>>> dd['key1'] # key1存在
'abc'
>>> dd['key2'] # key2不存在,返回默认值
'N/A'
OrderedDict
# 使用dict时,Key是无序的。在对dict做迭代时,我们无法确定Key的顺序。
# 如果要保持Key的顺序,可以用OrderedDict
ChainMap
# ChainMap可以把一组dict串起来并组成一个逻辑上的dict。ChainMap本身也是一个dict,但是查找的时候,会按照顺序在内部的dict依次查找
Counter
# Counter是一个简单的计数器,例如,统计字符出现的个数:
>>> from collections import Counter
>>> c = Counter()
>>> for ch in 'programming':
... c[ch] = c[ch] + 1
...
>>> c
Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
3. hashlib
hashlib提供了常见的摘要算法,如MD5,SHA1等等
摘要算法又称哈希算法、散列算法。它通过一个函数,把任意长度的数据转换为一个长度固定的数据串(通常用16进制的字符串表示)
MD5是最常见的摘要算法,速度很快,生成结果是固定的128 bit字节,通常用一个32位的16进制字符串表示。
SHA1的结果是160 bit字节,通常用一个40位的16进制字符串表示。
比SHA1更安全的算法是SHA256和SHA512,不过越安全的算法不仅越慢,而且摘要长度更长。
import hashlib
md5 = hashlib.md5()
md5.update('how to use md5 in python hashlib?'.encode('utf-8'))
print(md5.hexdigest())
由于常用口令的MD5值很容易被计算出来,所以,要确保存储的用户口令不是那些已经被计算出来的常用口令的MD5,这一方法通过对原始口令加一个
复杂字符串来实现,俗称“加盐”:
def calc_md5(password):
return get_md5(password + 'the-Salt')
def get_md5(s):
return hashlib.md5(s.encode('utf-8')).hexdigest()
4. hmac
为了防止黑客通过彩虹表根据哈希值反推原始口令,在计算哈希的时候,不能仅针对原始输入计算,需要增加一个salt来使得相同的
输入也能得到不同的哈希,这样,大大增加了黑客破解的难度。
如果salt是我们自己随机生成的,通常我们计算MD5时采用md5(message + salt)。但实际上,把salt看做一个“口令”,加
salt的哈希就是:计算一段message的哈希时,根据不通口令计算出不同的哈希。要验证哈希值,必须同时提供正确的口令。
这实际上就是Hmac算法:Keyed-Hashing for Message Authentication。它通过一个标准算法,在计算哈希的过程中,把key混入计算过程中
>>> import hmac
>>> message = b'Hello, world!'
>>> key = b'secret'
>>> h = hmac.new(key, message, digestmod='MD5')
>>> # 如果消息很长,可以多次调用h.update(msg)
>>> h.hexdigest()
'fa4ee7d173f2d97ee79022d1a7355bcf'
字符串b开头表示字符串是bytes类型的, string与bytes互转
str转bytes:
bytes('123', encoding='utf8')
str.encode('123')
bytes转str:
str(b'123', encoding='utf-8')
bytes.decode(b'123')
5. JSON
json.loads(json_str) # json字符串转换成字典
json.dumps(dict) # 字典转换成json字符串
# json.dumps 序列化时对中文默认使用的ascii编码.想输出真正的中文需要指定ensure_ascii=False:
json.dumps('中国',ensure_ascii=False)