查壳后发现用了最新的360加固,于是在脱壳上面废了几天时间,主要是不会IDA分析内存,所以只有各种查资料去找别人的工具了,最后通过 frida-unpack 成功脱壳,具体可以看上篇文章:初入APK逆向破解小记 这里就不过多阐述了。
脱壳后的代码是 Java 代码,拿着自己半吊子的功底在加上 open AI ,倒是也能做到审计了。
思路1:通过python+frida hook 最新消息接受方法
在这里卡了一段时间,最开始以为是消息接受处理存在问题,所以又改成了接受消息存本地数据库,然后从数据库取出消息在处理,最后无奈发现因为python调用frida hook的原因,是必然会遗漏些频率高的消息的,不管用不用数据库。。。
虽然遗漏一些消息,但是只要能实现检测到频繁者,能撤回绝大部分消息并自动踢人,其实也算完工了,但是我这该死的完美主义,消息遗漏+配合模拟器ADB这种繁琐的方式,让我这个完美主义者不能忍,所以最终 PASS 掉了这个方法。
import frida,json,re,time,datetime,requests,hashlib,threading,os,queue import jaydebeapi from collections import defaultdict def md5_hash(text): md5_hasher = hashlib.md5() md5_hasher.update(text.encode('utf-8')) hashed_text = md5_hasher.hexdigest() return hashed_text def md5Psw(account, pwd): hashed_pwd = md5_hash( md5_hash(pwd) + account + 'MOSGRAM') return hashed_pwd def remove_special_characters(input_string): try: # 定义特殊字符的正则表达式模式 special_char_pattern = re.compile('[^x00-x7F]+') # 使用正则表达式替换特殊字符为空字符串 cleaned_string = special_char_pattern.sub('', input_string) return cleaned_string except Exception as e: print("[!] 异常发生:", e) return input_string def data_to_json(data): try: pattern = r'(w+)s*=s*([^,]+)' result = [] records = data.split('], [') for record in records: match = re.findall(pattern, record) record_dict = {} for key, value in match: if value.startswith('{') and value.endswith('}'): record_dict[key] = json.loads(value) else: record_dict[key] = value result.append(record_dict) parsed_data = json.dumps(result, indent=4) return parsed_data except json.JSONDecodeError as json_error: print("[!] 数据转换失败: JSON 解析错误:", json_error) except Exception as e: print("[!] 数据转换失败:", e) return None def custom_request(method, url, json_data=None, headers=None): try: if method == 'POST': response = requests.get(url, headers=headers) elif method == 'POST': response = requests.post(url, json=json_data, headers=headers) elif method == 'DELETE': response = requests.delete(url, json=json_data, headers=headers) else: raise ValueError("[!] 不支持的请求方法: " + method) return response.json() except requests.exceptions.RequestException as e: print("[!] 网络请求异常:", e) return None def usb_device(): try: # 连接安卓机上的frida-server device = frida.get_usb_device(1000) # 启动app pid = device.spawn(["com.paopaotalk.im"]) device.resume(pid) session = device.attach(pid) return session except: exit("[!] 启动应用APP出错,重新执行脚本!") # 消息回调函数,处理从脚本发送的消息 def on_message(message, payload): try: global call_back_message if message['type'] == 'send': call_back_message = message['payload'] return call_back_message except: exit("[!] 消息回调失败,重新执行脚本!") def hook_function(session): try: global script if script: script.unload() # 释放之前的脚本 script_source = """ Java.perform(function() { var getVal = Java.use('v5.g'); var cons = getVal.l(); var send_it = cons.toString(); send(send_it); }); """ script = session.create_script(script_source) script.on('message', on_message) script.load() return script except: exit("[!] 注入脚本失败,重新执行脚本!") #登录 def login(username,pwd): try: print("[-] 正在登录账号...") login_data_json = {"username": "aaa","password": "bbb","device_type":"WEB"} login_data_json["username"] = username login_data_json["password"] = md5Psw(username, pwd) login_header_json = {"Content-Type":"application/json; charset=UTF-8","X-Requested-With":"XMLHttpRequest","App_id":"qiyunxin","timestamp": str(int(time.time() * 1000)),"sign": md5_hash(str(int(time.time() * 1000)) + "WEB"),} logins = custom_request('POST', url='http://api.vvchat.im/userservices/v2/user/login', json_data=login_data_json, headers=login_header_json) if logins['token'] and logins['open_id']: print("[!] 登录账号成功!") return logins except: exit("[!] 登录失败,重新执行脚本!") #数据转换集合 def zhuanhuan(data): data_json = json.loads(data_to_json(call_back_message)) return data_json def tiren(chat_name, from_cust_id): if chat_name in groups_list: open_id = custom_request('GET', url=f"http://api.vvchat.im/groupapi/v1/users/{logins['open_id']}/groups/{groups_list[chat_name[1]]}/members?page_size=100000", headers={"Content-Type": "application/json; charset=UTF-8", "Authorization": logins['token']}) if open_id: if chs['err_msg'] == 'OK': print(f"[-] 群消息 {row[0]} 撤回成功!") delete_data(f"DELETE FROM data WHERE msg_no = '{row[0]}' AND chat_name = '{row[1]}';") else: print(f"[!] 群消息 {row[0]} 撤回失败,原因:{chs['err_msg']} 请检查...") delete_data(f"DELETE FROM data WHERE msg_no = '{row[0]}' AND chat_name = '{row[1]}';") else: msg_no_queue.put(row) print(f"[!] 群消息 {row[0]} 撤回失败,已重新添加到列队!") #持续不间断接受新数据并转存到列队 def run1(): global unique_data_set while True: hook_function(session) if call_back_message: call_back_message_json = zhuanhuan(remove_special_characters(call_back_message)) for item in call_back_message_json: if groups in item.get("chat_name", "") and item.get("chat_type", "") == "2": data = (item.get("chat_name", ""),item.get("msg_no", ""),item.get("from_cust_id", ""),item.get("content", ""),item.get("msg_time", "")) if item.get("content", "") != '{0}开启群禁言' and item.get("content", "") != '{0}撤回了一条消息' and item.get("content", "") != '' and data not in unique_data_set: ls_queue.put(data) unique_data_set.add(data) time.sleep(0.0001) #从列表中提取数据判断是否存在 def run2(): while True: with search_queue_lock: while not ls_queue.empty(): data_to_insert = ls_queue.get() # 从队列取出数据 search_msg_no = query_data(f"select id from data where msg_no = '{data_to_insert[2]}'") if not search_msg_no: insert_queue.put(data_to_insert) time.sleep(0.001) #将insert_queue存储的最新发信信息插入到数据库中 def run3(): while True: with search_queue_lock: while not insert_queue.empty(): data_to_insert = insert_queue.get() # 从队列取出数据 print("[-] 新数据:",data_to_insert) insert_data(data_to_insert) time.sleep(0.001) #搜索数据库,找出违规者所有发信MSG_NO编号,加入到列队中 def run4(): while True: with search_queue_lock: if msg_no_queue.empty() and insert_queue.empty() and ls_queue.empty(): #数据赛选,提取违规者发的全部信息 MSG_NO编号 select_query = """ WITH UniqueChatNames AS ( SELECT chat_name, from_cust_id FROM DATA WHERE id IN ( SELECT MIN(id) FROM (SELECT id,chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9) FROM DATA WHERE (chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9)) IN ( SELECT chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9) FROM DATA GROUP BY chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9) HAVING COUNT(*) > 1 ) ) grouped_data GROUP BY chat_name, from_cust_id ) ) SELECT STRINGDECODE (d.msg_no), STRINGDECODE (d.chat_name) FROM ( SELECT *, ROW_NUMBER () OVER ( PARTITION BY chat_name, from_cust_id, msg_no ORDER BY id ) AS rn FROM DATA ) d JOIN UniqueChatNames subquery ON d.chat_name = subquery.chat_name AND d.from_cust_id = subquery.from_cust_id WHERE d.rn = 1; """ results = query_data(select_query) if results:#列表类型 msg_no_queue.put(results) print(f"[!] 已检测到 {len(results)} 条违规信息.") time.sleep(0.001) #提取msg_no编号,并撤回这些编号消息 def run5(): #登录 logins = login(username,pwd) while True: with search_queue_lock: while not msg_no_queue.empty(): msg_no_list = msg_no_queue.get() for row in msg_no_list: if row[1] in groups_list: data = {"msg_no": row[0],"msg_time": int(time.time() * 1000)} chs = custom_request('POST', url=f"http://api.vvchat.im/groupapi/v1/users/{logins['open_id']}/groups/{groups_list[row[1]]}/revoke_msg", json_data=data, headers={"Content-Type": "application/json; charset=UTF-8", "Authorization": logins['token']}) if chs: if chs['err_msg'] == 'OK': print(f"[-] 群消息 {row[0]} 撤回成功!") delete_data(f"DELETE FROM data WHERE msg_no = '{row[0]}' AND chat_name = '{row[1]}';") else: print(f"[!] 群消息 {row[0]} 撤回失败,原因:{chs['err_msg']} 请检查...") delete_data(f"DELETE FROM data WHERE msg_no = '{row[0]}' AND chat_name = '{row[1]}';") else: msg_no_queue.put(row) print(f"[!] 群消息 {row[0]} 撤回失败,已重新添加到列队!") else: exit("[!] 违规所在群 ID 获取失败, 代码中添加该群ID!") time.sleep(0.001) #5分钟清理一次数据库 def run6(): global unique_data_set while True: time.sleep(300) # 五分钟 unique_data_set.clear() delete_data("TRUNCATE TABLE data;") #清空数据库 # 创建数据库连接 def create_connection(): conn = jaydebeapi.connect("org.h2.Driver", h2_jdbc_url, [h2_user, h2_password], h2_driver_path) return conn # 创建表 def create_table(): conn = create_connection() create_table_sql = """ CREATE TABLE IF NOT EXISTS data ( id BIGINT AUTO_INCREMENT PRIMARY KEY, chat_name TEXT NOT NULL, msg_no TEXT NOT NULL, from_cust_id TEXT NOT NULL, content TEXT NOT NULL, msg_time BIGINT NOT NULL ) """ cursor = conn.cursor() try: cursor.execute(create_table_sql) conn.commit() except jaydebeapi.Error as e: print("创建表异常:", e) cursor.close() return False finally: cursor.close() return True # 插入数据 def insert_data(data): insert_query = ''' INSERT INTO data (chat_name, msg_no, from_cust_id, content, msg_time) VALUES (?, ?, ?, ?, ?); ''' try: conn = create_connection() cursor = conn.cursor() cursor.execute(insert_query, data) conn.commit() except jaydebeapi.Error as e: print("[!] 插入失败:", e) finally: cursor.close() # 查询数据 def query_data(sql): try: conn = create_connection() cursor = conn.cursor() cursor.execute(sql) results = cursor.fetchall() except jaydebeapi.Error as e: print("[!] 查询失败:", e) finally: cursor.close() return results def delete_data(sql): try: conn = create_connection() cursor = conn.cursor() cursor.execute(sql) conn.commit() except jaydebeapi.Error as e: print("[!] 查询失败:", e) finally: cursor.close() if __name__ == "__main__": ################################################ username = '0086' pwd = '' groups = '关键词' #需要踢人的群名字统一关键词,也可以是一个字母符号,如果群名字不带这个则不会踢人 groups_list = { "aaa测试3": "pUalcqhBTcdg44G_j9s5K-XCUF83FRHB", "aaa测试1": "928JLh8AAA65RyryJLbTiPKnV0NJWXIi", "aaa测试2": "Y26osJNBCaOMENv71XMT7CmOT1hYhee9" } ################################################ # 全局变量 call_back_message = None script = None # 用于存储已加载的脚本 unique_data_set = set() # 创建队列 ls_queue = queue.Queue() insert_queue = queue.Queue() msg_no_queue = queue.Queue() # 创建锁 search_queue_lock = threading.Lock() # 连接参数 h2_jdbc_url = "jdbc:h2:./test" h2_user = "test" h2_password = "test" h2_driver_path = "h2-1.4.200.jar" if not os.path.exists(os.path.expanduser("./test.mv.db")): # 数据库文件不存在,创建数据库和表 if create_table(): print("数据库表创建成功!") else: os.remove("./test.mv.db") exit("[!] 数据表创建失败,退出代码!") print("[-] 正在启动应用APP...") session = usb_device() time.sleep(8) #APP启动有加载时间 try: print("[-] 检测状态: 正在检测信息...") # 启动线程模拟函数的同时运行 threading.Thread(target=run1).start() threading.Thread(target=run2).start() threading.Thread(target=run3).start() threading.Thread(target=run4).start() threading.Thread(target=run5).start() threading.Thread(target=run6).start() except KeyboardInterrupt: script.unload() print("[-] 脚本已卸载,程序退出。")
数据库使用了sqlcipher 4.5.3 加密,在尝试用pysqlcipher3库打开sqlite数据库始终连不上,查阅半天资料发现pysqlcipher3库不支持sqlcipher 4.5.3 加密的sqlite数据库;
后面通过 DB Browser 和 SQLiteStudio 2款sqlit数据库管理工具连上了数据库;
DB Browser 不存在Windows下命令行,不过可以在Linux下安装,所以使用 Termux 命令执行APP在安卓系统安装,结果发现源找不到库DB Browser的库,更换源也没辙,官方也没有提供Linux下编译安装包。
SQLiteStudio 直接不支持命令行,不过发现它可以直接通过adb的方式连接上APP中的数据库,但是因为没法通过python调用,所以最后pass掉了。
研究 websocket
这个问题卡了我两三天,最后成功的时候才发现,python 的 websockets 库就是个垃圾,垃圾中的超级垃圾,坑死我了,我不知道它为什么会20秒断开,看看AI怎么评价 websockets 的:
使用 websocket 库成功搞定
import queue import websocket import threading import re import base64 import random import requests import time import json import hashlib from Crypto.Cipher import DES from Crypto.Util.Padding import pad,unpad # # 群ID加密 # def encryptCustId(custId): custId = str(custId) custIdArray = [int(digit) for digit in custId] encrypted_message = encryptByDES(byteToString(custIdArray)) encrypted_message = encrypted_message.replace('/', '_').replace('+', '-') return encrypted_message #加密 def encryptByDES(message, key='klohjmz_'): key = key.encode('utf-8') iv = key message = message.encode('utf-8') cipher = DES.new(key, DES.MODE_CBC, iv) ciphertext = cipher.encrypt(pad(message, DES.block_size)) encrypted_message = base64.b64encode(ciphertext).decode('utf-8') return encrypted_message #解密 def decryptByDES(ciphertext, key='klohjmz_'): key_bytes = key.encode('utf-8') iv_bytes = key.encode('utf-8') cipher = DES.new(key_bytes, DES.MODE_CBC, iv_bytes) decrypted = unpad(cipher.decrypt(base64.b64decode(ciphertext.replace('_', '/').replace('-', '+'))), DES.block_size) return stringify(decrypted) def stringify(a): m = [] for t in a: m.append(str(t & 15)) return ''.join(m) def byteToString(arr): if isinstance(arr, str): return arr str_ = '' i = 0 while i < len(arr): one = format(arr[i], '08b') v = re.match(r'^1+?(?=0)', one) if v and len(one) == 8: bytes_length = len(v.group(0)) store = format(arr[i], '08b')[7 - bytes_length:] for st in range(1, bytes_length): store += format(arr[i + st], '08b')[2:] str_ += chr(int(store, 2)) i += bytes_length - 1 else: str_ += chr(arr[i]) i += 1 return str_ # # 接受数据解析 # def parse_number_data(data): if len(data) == 8: v1 = int.from_bytes(data[0:4], byteorder='little', signed=False) v2 = int.from_bytes(data[4:8], byteorder='little', signed=False) mp = 2 ** 32 x = v2 y = mp v1n = v1 result = x * y + v1n return str(result) v = 0 for i, n in enumerate(data): n = n if n >= 0 else n & 0xff v += n * (256 ** i) return v def parse_string(arr): return uint_to_string(bytearray(arr)) def uint_to_string(uint_array): encoded_string = ''.join(chr(byte) for byte in uint_array) decoded_string = bytes(encoded_string, 'latin-1').decode('utf-8') return decoded_string # # 登录 # pfcCounter = 0 def next_message_num(): global pfcCounter pfcCounter += 1 pfc = pfcCounter result = [] while pfc > 0: temp = pfc pfc = pfc // 256 result.append(temp - pfc * 256) while len(result) < 8: result.append(0) result[3] = random.randint(1, 9) result[4] = random.randint(1, 9) result[5] = random.randint(1, 9) result[6] = random.randint(1, 9) result[7] = random.randint(1, 9) return result def current_times(): timestamp = int(time.time()) result = [0] * 8 i = 7 while timestamp > 0: temp = timestamp timestamp = timestamp // 256 result[i] = temp - timestamp * 256 i -= 1 return result def calc_checksum(buffer, buffer_len): if buffer is None: return [0, 0, 0, 0] tmp = [0, 0, 0, 0] dest = [0, 0, 0, 0] len = 0 offset = 0 while len < buffer_len: if len + 4 > buffer_len: offset = buffer_len - len else: offset = 4 for i in range(offset): tmp[i] ^= buffer[len + i] len += 4 for i in range(4): dest[i] = tmp[i] return dest def split_cust_id(cust_id): result = [0] * 8 i = 0 while cust_id > 0: temp = cust_id cust_id = cust_id // 256 result[i] = temp - cust_id * 256 i += 1 return result def split_auth(auth): result = [ord(char) for char in auth] if len(result) < 32: for i in range(len(result), 32): result.append(0) return result def md5_hash(text): md5_hasher = hashlib.md5() md5_hasher.update(text.encode('utf-8')) hashed_text = md5_hasher.hexdigest() return hashed_text def md5Psw(account, pwd): hashed_pwd = md5_hash( md5_hash(pwd) + account + 'MOSGRAM') return hashed_pwd def login(username,pwd): global open_id_qj,im_token,token data = { "username": f"{username}", "password": md5Psw(username, pwd), "device_type":"WEB" } headers = { "Content-Type":"application/json; charset=UTF-8", "X-Requested-With":"XMLHttpRequest", "App_id":"qiyunxin", "timestamp": str(int(time.time() * 1000)), "sign": md5_hash(str(int(time.time() * 1000)) + "WEB"), "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36" } try: print('[信息] WEB端登录中...') response = requests.post('http://api.vvchat.im/userservices/v2/user/login', headers=headers,data=json.dumps(data)) data = response.json() if 'err_msg' in data: exit('[信息] WEB端登录失败: %s!' % data['err_msg']) elif 'open_id' in data: print('[信息] WEB端成功!') open_id_qj = data['open_id'] im_token = data['im_token'] token = data['token'] except requests.exceptions.RequestException as e: print('[信息] WEB端登录出错:%s'%(e)) except Exception as e: print('[信息] WEB端登录异常报错:%s'%(e)) def revoke_msg(msg_no,groups_id,group_name): print(f"[巡查] 开始撤回群 {group_name} 消息编号 {msg_no} 消息!") data = {"msg_no": msg_no,"msg_time": int(time.time()) * 1000} headers = {"Content-Type":"application/json; charset=UTF-8","X-Requested-With":"XMLHttpRequest","App_id":"qiyunxin","User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36","Authorization":token} try: response = requests.post(f'http://api.vvchat.im/groupapi/v1/users/{open_id_qj}/groups/{groups_id}/revoke_msg', headers=headers,data=json.dumps(data)) result = response.json() if result['err_msg'] == 'OK': print(f"[巡查] 群 {group_name} 消息编号 {msg_no} 撤回成功!") else: print(f"[巡查] 群 {group_name} 消息编号 {msg_no} 撤回失败,原因:{result['err_msg']} 请检查...") except requests.exceptions.RequestException as e: print('[巡查] 群 %s 消息编号 %s 撤回出错:%s ,重试一次...'%(group_name,msg_no,e)) revoke_msg(msg_no,groups_id) except Exception as e: print('[巡查] 群 %s 消息编号 %s 撤回异常报错:%s ,重试一次...'%(group_name,msg_no,e)) revoke_msg(msg_no,groups_id) def delete_member(from_cust_id,groups_id): group_name = groups_name(groups_id) print(f"[巡查] 开始剔除群 {group_name} 成员 {from_cust_id}") from_cust_id_encrypt = encryptCustId(from_cust_id) data = [{"name":"default","open_id": from_cust_id_encrypt,"cust_id": from_cust_id}] headers = {"Content-Type":"application/json; charset=UTF-8","X-Requested-With":"XMLHttpRequest","App_id":"qiyunxin","User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36","Authorization":token} try: response = requests.delete(f'http://api.vvchat.im/groupapi/v1/users/{open_id_qj}/groups/{groups_id}/members', headers=headers,json=data) result = response.json() if result['err_msg'] == 'OK': print(f"[巡查] 群 {group_name} 成员 {from_cust_id} 剔除成功!") else: print(f"[巡查] 群 {group_name} 成员 {from_cust_id} 剔除失败,原因:{result['err_msg']} 请检查...") except requests.exceptions.RequestException as e: print('[巡查] 群 %s 成员 %s 剔除出错:%s,重试一次...'%(group_name,from_cust_id,e)) delete_member(from_cust_id,groups_id) except Exception as e: print('[巡查] 群 %s 成员 %s 剔除异常报错:%s,重试一次...'%(group_name,from_cust_id,e)) delete_member(from_cust_id,groups_id) def groups_name(groups_id): headers = {"App_id":"qiyunxin","User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36","Authorization":token} try: response = requests.get(f'http://api.vvchat.im/groupapi/v1/users/{open_id_qj}/groups/{groups_id}', headers=headers) result = response.json() if result: return result['group_name'] except requests.exceptions.RequestException as e: print('[巡查] 获取群名称 出错:%s'%(e)) groups_name(groups_id) except Exception as e: print('[巡查] 获取群名称 异常报错:%s'%(e)) groups_name(groups_id) #登录包 def wss_login_data(from_cust_id,im_token): from_cust_id = split_cust_id(int(decryptByDES(from_cust_id))) auth = split_auth(im_token) msg_no = next_message_num() buffer = [] msg_type = [1, 0] body = [] body += login_type + from_cust_id + auth checkCode = calc_checksum(body, len(body)) body += checkCode + end buffer += start + length + version + msg_no + msg_type + body len_body = len(body) buffer[1] = len_body - ((len_body >> 16) << 16) buffer[2] = len_body >> 8 buffer[3] = len_body >> 16 buffer[4] = 0 return buffer #心跳包 def wss_xt_data(open_id): buffer = [] from_cust_id = split_cust_id(int(decryptByDES(open_id))) msg_no = next_message_num() msg_type = [6, 0] body = [] body += login_type + from_cust_id checkCode = calc_checksum(body, len(body)) body += checkCode + end buffer += start + length + version + msg_no + msg_type + body len_body = len(body) buffer[1] = len_body - ((len_body >> 16) << 16); buffer[2] = len_body >> 8; buffer[3] = len_body >> 16; buffer[4] = 0; return buffer #消息检测 def check_and_clean_data(): global msg_queue wg_list = [] msg_list = [] while True: if not msg_queue.empty(): for _ in range(4): item = msg_queue.get() msg_list.append(item) print("[巡查] 巡查检测中...") matching_data = [item for item in msg_list if item[0] == msg_list[-1][0] and item[1] == msg_list[-1][1] and item[3] == msg_list[-1][3]] if len(matching_data) >= 3:# 三者相同的存在大于等于三条 timestamps = [item[4] for item in matching_data] #获取时间字段 time_diffs = [int(timestamps[i + 1]) - int(timestamps[i]) for i in range(len(timestamps) - 1)] for diff, data in zip(time_diffs, matching_data[1:]): if diff == 0 or diff <= 3: for x in msg_list: if data[0] == x[0]: wg_list.append(x) matching_data = [] if wg_list: # 使用集合去重,通过将子列表转换为元组 unique_list = list(set(tuple(sub_list) for sub_list in wg_list)) # 将元组转换回列表 unique_nested_list = [list(sub_tuple) for sub_tuple in unique_list] from_cust_ids = set() # 用于存储去重后的 from_cust_id for group in unique_nested_list: if group: # 确保列表不为空 from_cust_ids.add((group[0], group[1])) for user in from_cust_ids: #踢人 print(f"[巡查] 检测到群: {groups_name(user[1])} 出现违规者:{user[0]}!") delete_member(x[0],x[1]) for msg_no in msg_list: for i in from_cust_ids: if msg_no[0] == i[0] and msg_no[1] == i[1]: #获取群名称 group_name = groups_name(msg_no[1]) #撤回 revoke_msg(msg_no[2],msg_no[1],group_name) wg_list = [] msg_list = [] else: print("[巡查] 消息未检测到频繁!") #服务端传输数据类型判断 def eventReceiveMessage(ws,msg): global msg_queue msg_type = parse_number_data(msg[14:16]) status = parse_number_data(msg[25:29]) if msg_type == 2: if status == 200: print('[信息] websocket 端登录成功,开始监听信息...') # 创建发送心跳包的线程 heartbeat_thread = threading.Thread(target=heartbeat, args=(ws,)) heartbeat_thread.daemon = True heartbeat_thread.start() # 创建消息处理线程 msg_thread = threading.Thread(target=check_and_clean_data) msg_thread.daemon = True msg_thread.start() elif status == 401: print('[信息] websocket 端登录失败!') ws.close() elif msg_type == 5: chatType = parse_number_data(msg[16:17]) content = parse_string(msg[61:61+parse_number_data(msg[57:61])]) content_json = json.loads(content)['content'] msgNo = parse_number_data(msg[6:14]) groups_id = encryptCustId(parse_number_data(msg[25:33])) msgTime = parse_number_data(msg[17:25]) from_cust_id = parse_number_data(msg[33:41]) data = [from_cust_id,groups_id,msgNo,content_json,msgTime] if chatType == 2: print(f"[消息] 发送者:{from_cust_id} 发自群:{groups_name(groups_id)} 发送时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(msgTime)))}") msg_queue.put(data) elif msg_type == 6: print("[信息] websocket 端心跳正常!") else: pass def heartbeat(ws): while True: try: print(f'[信息] 发送心跳推送...') ws.send(bytes(wss_xt_data(open_id_qj))) time.sleep(15) except: print("[信息] 心跳推送失败!") ws.close() def on_error(ws, error): print(f"[错误] {error}") def on_close(ws, close_status_code, close_msg): print("[信息] websocket 端连接已断开,开始重新启动...") run() def on_message(ws, message): eventReceiveMessage(ws,message) def on_open(ws): print("[信息] websocket 端已连接!") login(username,password) print("[信息] websocket 端登录中...") ws.send(bytes(wss_login_data(open_id_qj,im_token))) def run(): try: # 创建WebSocket连接 print("[信息] websocket 端连接状态检测...") ws = websocket.WebSocketApp(url, on_message=on_message, on_open=on_open,on_error=on_error,on_close=on_close) # 启动WebSocket连接 ws.run_forever() except: print("[信息] websocket 端连接启动失败!") if __name__ == "__main__": global username,password ###############配置专区############### username = '0086' #0086+手机号 password = '' url = "wss://im.mlskm.com/ws8" ###############配置专区############### #全局变量定义 msg_queue = queue.Queue() start = [2] version = [1] end = [3] length = [0, 0, 0, 0] login_type = [2] #启动 run()
- 入手学习到了逆向脱壳,以及 app hook 方面的知识。
- 对安卓中的Java代码更加熟悉。
- 入手了 sqlite 数据库及 sqlcipher 加密方式方面的知识。
- 入手了socket、websocket方面的知识。
- 对python更加熟练。