阅读python文档

This commit is contained in:
asahi
2024-05-04 03:19:50 +08:00
parent 11850cbb51
commit baac0d4fee

View File

@@ -482,3 +482,215 @@ def fuck(*name_list):
fuck("Nvidia", "Amd")
```
## 文件操作
### 文件读取
```py
with open('pi_digits.txt') as file_object:
contents = file_object.read()
print(contents)
```
### 写入文件
```py
filename = 'programming.txt'
with open(filename, 'w') as file_object:
file_object.write("I love programming.")
file_object.write("I love creating new games.")
```
### 文件末尾追加
```py
filename = 'programming.txt'
with open(filename, 'a') as file_object:
file_object.write("I also love finding meaning in large datasets.\n")
file_object.write("I love creating apps that can run in a browser.\n")
```
## 异常处理
在python中可以通过如下语法来进行异常处理
```py
try:
# code block
pass
except ErrorType1:
# error type 1 handling code block
pass
except ErrorType2:
# error type 2 handling code block
pass
except Exception as result:
# handle other exception types
pass
else:
# code block that no error occurs
pass
finally:
# code block executed whether exception occurs
pass
```
### 抛异常
#### 捕获异常后重新抛出异常
如果python在捕获异常后需要重新对异常进行抛出可以使用`raise`关键字
```py
try:
m = 10/0
except Exception as e:
print(e)
raise
finally:
print("handle correctly")
```
#### 主动抛出异常
主动抛出异常时,也可使用`raise`关键字
```py
try:
raise Exception("fucking Nvidia raised their price!")
except Exception as e:
print(e)
finally:
print("my wallet is empty!")
```
## 数据存储
### json.dump
可以通过`json.dump`方法将内容以json的格式存储到文件中
```py
import json
def save_as_json(data, fd):
json.dump(data, fd)
waifu = {
"name": "Touma Kazusa",
"job": "Student",
"gender": "female",
"age": 17
}
is_save_success = True
try:
with open("shiro.json", "w") as shiro_fd:
save_as_json(waifu, shiro_fd)
except Exception as e:
print(e)
is_save_success = False
exit(1)
finally:
if is_save_success:
print("save success!")
```
### json.load
可以通过`json.load`方法读取文件中的json格式内容
```py
import json
def read_as_json(fd):
return json.load(fd)
is_read_success = True
try:
with open("shiro.json", "r") as shiro_fd:
r_obj = read_as_json(shiro_fd)
print(r_obj)
except Exception as e:
print(e)
is_read_success = False
exit(1)
finally:
if is_read_success:
print("read success!")
```
## http api
python安装requests包之后可以访问http接口
```py
import math
import requests
import time
try:
body = requests.get("https://api.m.taobao.com/rest/api3.do", {
"api": "mtop.common.getTimestamp"
}).json()
ts = body['data']['t']
t = time.localtime(math.trunc(int(ts) / 1000))
time_str = time.strftime("%Y-%m-%d %H:%M:%S %z", t)
print(time_str)
except Exception as e:
print(e)
raise
```
## 多线程
python可以通过引入threading和concurrent.futures两个包来引入多线程
```py
import math
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import requests
ts_arr = []
lock = threading.Lock()
def query_ts():
try:
body = requests.get("https://api.m.taobao.com/rest/api3.do", {
"api": "mtop.common.getTimestamp"
}).json()
except Exception as e:
print(e)
raise
def query_ts_once():
ts_beg = time.time_ns()
query_ts()
ts_end = time.time_ns()
ts_spend = math.trunc((ts_end - ts_beg) / 1000000)
with lock:
ts_arr.append(ts_spend)
def stress_test(test_cnt):
with ThreadPoolExecutor(max_workers=32) as executor:
i = 0
f_list = []
while i < test_cnt:
f_t = executor.submit(query_ts_once)
f_list.append(f_t)
i += 1
as_completed(f_list)
stress_test(100)
print(f"max ts : {max(ts_arr)} ms")
print(f"min ts : {min(ts_arr)} ms")
print(f"avg ts : {sum(ts_arr) / len(ts_arr)} ms")
print(ts_arr)
```
## linux命令交互
如果想要调用系统linux的命令可以使用subprocess模块。
```py
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)
```
调用示例如下:
```py
ret = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(ret.stdout)
```
输出如下:
```bash
total 4
-rwxrwxrwx 1 asahi asahi 110 May 4 03:18 main.py
-rwxrwxrwx 1 asahi asahi 73 May 4 01:10 shiro.json
drwxrwxrwx 1 asahi asahi 4096 Feb 7 21:39 venv
```