瀏覽代碼

18MiB of Pythonic Rage

master
Emil Williams 3 月之前
父節點
當前提交
93f77609cf
沒有發現已知的金鑰在資料庫的簽署中 GPG Key ID: 5432DB986FDBCF8A
共有 1 個文件被更改,包括 77 次插入0 次删除
  1. +77
    -0
      client/moontalk.py

+ 77
- 0
client/moontalk.py 查看文件

@@ -0,0 +1,77 @@
# Copyright 2024 Anonymous
# Licensed under the terms of MIT+NIGGER
# Notable issues:
# No word wrap in the result box. It's a listbox and should probably be a textbox of some kind.
# When it is closed it shits into the console like no tomorrow, too bad.
# Overall this is a disgusting hack of python garbage

import asyncio
from tkinter import *
from tkinter.ttk import *

running = True
root = Tk()
root.option_add("*Font", 'TkFixedFont')

def tk_kill(): # no good, nasty way to stop everything
global running
running = False
root.destroy()

root.protocol("WM_DELETE_WINDOW", tk_kill)
root.geometry("700x400")
root.rowconfigure(tuple(range(11)), weight=3) # needed for css-like grid behavior
root.columnconfigure(tuple(range(6)), weight=3)

message_log = Listbox(root)
message_log.grid(rowspan=9, row=0, column=0, columnspan=6, sticky='nsew')

input_var = StringVar()
input_box = Entry(root, textvariable=input_var)
input_box.grid(row=10, column=0, columnspan=5, sticky='nsew')


def tk_send_message():
content = input_var.get()
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(outbound_queue.put(content), loop)
input_var.set("")

submit_button = Button(root, text = 'Submit', command = tk_send_message)
submit_button.grid(row=10, column=5, columnspan=1, sticky='nsew')

# potential improvement: bind enter on input_box to submit as well

outbound_queue = asyncio.Queue()

def add_message(content: str):
message_log.insert(END, content.strip())

async def consume_message(reader: asyncio.StreamReader):
while not reader.at_eof():
line = await reader.readline()
add_message(line.decode('ascii'))

async def submit_message(writer: asyncio.StreamWriter):
while True:
content: str = await outbound_queue.get()
content = content + "\n" # all messages must newline
writer.write(content.encode('ascii'))
add_message(f"YOU: {content}") # echo it back
outbound_queue.task_done()

async def update_tk():
while running:
root.update()
await asyncio.sleep(0.01)
raise asyncio.CancelledError

async def main():
reader, writer = await asyncio.open_connection(
"7ks473deh6ggtwqsvbqdurepv5i6iblpbkx33b6cydon3ajph73sssad.onion",
50000
)
await asyncio.gather(update_tk(), consume_message(reader), submit_message(writer))

if __name__ == "__main__":
asyncio.run(main())

Loading…
取消
儲存