diff --git a/python/py.md b/python/py.md index 2f780ca..89330cd 100644 --- a/python/py.md +++ b/python/py.md @@ -481,4 +481,216 @@ def fuck(*name_list): fuck("Nvidia", "Amd") -``` \ No newline at end of file +``` +## 文件操作 +### 文件读取 +```py +with open('pi_digits.txt') as file_object: + contents = file_object.read() + print(contents) +``` +### 写入文件 +```py +filename = 'programming.txt' +with open(filename, 'w') as file_object: + file_object.write("I love programming.") + file_object.write("I love creating new games.") +``` + +### 文件末尾追加 +```py +filename = 'programming.txt' +with open(filename, 'a') as file_object: + file_object.write("I also love finding meaning in large datasets.\n") + file_object.write("I love creating apps that can run in a browser.\n") +``` +## 异常处理 +在python中,可以通过如下语法来进行异常处理 +```py +try: + # code block + pass +except ErrorType1: + # error type 1 handling code block + pass +except ErrorType2: + # error type 2 handling code block + pass +except Exception as result: + # handle other exception types + pass +else: + # code block that no error occurs + pass +finally: + # code block executed whether exception occurs + pass +``` + +### 抛异常 +#### 捕获异常后重新抛出异常 +如果python在捕获异常后需要重新对异常进行抛出,可以使用`raise`关键字 +```py +try: + m = 10/0 +except Exception as e: + print(e) + raise +finally: + print("handle correctly") +``` +#### 主动抛出异常 +主动抛出异常时,也可使用`raise`关键字 +```py +try: + raise Exception("fucking Nvidia raised their price!") +except Exception as e: + print(e) +finally: + print("my wallet is empty!") +``` + +## 数据存储 +### json.dump +可以通过`json.dump`方法将内容以json的格式存储到文件中: +```py +import json + + +def save_as_json(data, fd): + json.dump(data, fd) + + +waifu = { + "name": "Touma Kazusa", + "job": "Student", + "gender": "female", + "age": 17 +} + +is_save_success = True +try: + with open("shiro.json", "w") as shiro_fd: + save_as_json(waifu, shiro_fd) +except Exception as e: + print(e) + is_save_success = False + exit(1) +finally: + if is_save_success: + print("save success!") +``` +### json.load +可以通过`json.load`方法读取文件中的json格式内容: +```py +import json + + +def read_as_json(fd): + return json.load(fd) + + +is_read_success = True +try: + with open("shiro.json", "r") as shiro_fd: + r_obj = read_as_json(shiro_fd) + print(r_obj) +except Exception as e: + print(e) + is_read_success = False + exit(1) +finally: + if is_read_success: + print("read success!") +``` +## http api +python安装requests包之后,可以访问http接口 +```py +import math +import requests +import time + +try: + body = requests.get("https://api.m.taobao.com/rest/api3.do", { + "api": "mtop.common.getTimestamp" + }).json() + ts = body['data']['t'] + t = time.localtime(math.trunc(int(ts) / 1000)) + time_str = time.strftime("%Y-%m-%d %H:%M:%S %z", t) + print(time_str) +except Exception as e: + print(e) + raise +``` + +## 多线程 +python可以通过引入threading和concurrent.futures两个包来引入多线程: +```py +import math +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +import requests + +ts_arr = [] +lock = threading.Lock() + + +def query_ts(): + try: + body = requests.get("https://api.m.taobao.com/rest/api3.do", { + "api": "mtop.common.getTimestamp" + }).json() + except Exception as e: + print(e) + raise + + +def query_ts_once(): + ts_beg = time.time_ns() + query_ts() + ts_end = time.time_ns() + ts_spend = math.trunc((ts_end - ts_beg) / 1000000) + with lock: + ts_arr.append(ts_spend) + + +def stress_test(test_cnt): + with ThreadPoolExecutor(max_workers=32) as executor: + i = 0 + f_list = [] + while i < test_cnt: + f_t = executor.submit(query_ts_once) + f_list.append(f_t) + i += 1 + as_completed(f_list) + + +stress_test(100) +print(f"max ts : {max(ts_arr)} ms") +print(f"min ts : {min(ts_arr)} ms") +print(f"avg ts : {sum(ts_arr) / len(ts_arr)} ms") +print(ts_arr) +``` + +## linux命令交互 +如果想要调用系统(linux)的命令,可以使用subprocess模块。 +```py +subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None) +``` +调用示例如下: +```py +ret = subprocess.run(['ls', '-l'], capture_output=True, text=True) +print(ret.stdout) +``` +输出如下: +```bash +total 4 +-rwxrwxrwx 1 asahi asahi 110 May 4 03:18 main.py +-rwxrwxrwx 1 asahi asahi 73 May 4 01:10 shiro.json +drwxrwxrwx 1 asahi asahi 4096 Feb 7 21:39 venv +``` + + +