78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
# Copyright 2024 Anonymous
|
|
# Licensed under the terms of MIT+NIGGER
|
|
# Notable issues:
|
|
# No word wrap in the result box. It's a listbox and should probably be a textbox of some kind.
|
|
# When it is closed it shits into the console like no tomorrow, too bad.
|
|
# Overall this is a disgusting hack of python garbage
|
|
|
|
import asyncio
|
|
from tkinter import *
|
|
from tkinter.ttk import *
|
|
|
|
running = True
|
|
root = Tk()
|
|
root.option_add("*Font", 'TkFixedFont')
|
|
|
|
def tk_kill(): # no good, nasty way to stop everything
|
|
global running
|
|
running = False
|
|
root.destroy()
|
|
|
|
root.protocol("WM_DELETE_WINDOW", tk_kill)
|
|
root.geometry("700x400")
|
|
root.rowconfigure(tuple(range(11)), weight=3) # needed for css-like grid behavior
|
|
root.columnconfigure(tuple(range(6)), weight=3)
|
|
|
|
message_log = Listbox(root)
|
|
message_log.grid(rowspan=9, row=0, column=0, columnspan=6, sticky='nsew')
|
|
|
|
input_var = StringVar()
|
|
input_box = Entry(root, textvariable=input_var)
|
|
input_box.grid(row=10, column=0, columnspan=5, sticky='nsew')
|
|
|
|
|
|
def tk_send_message():
|
|
content = input_var.get()
|
|
loop = asyncio.get_running_loop()
|
|
asyncio.run_coroutine_threadsafe(outbound_queue.put(content), loop)
|
|
input_var.set("")
|
|
|
|
submit_button = Button(root, text = 'Submit', command = tk_send_message)
|
|
submit_button.grid(row=10, column=5, columnspan=1, sticky='nsew')
|
|
|
|
# potential improvement: bind enter on input_box to submit as well
|
|
|
|
outbound_queue = asyncio.Queue()
|
|
|
|
def add_message(content: str):
|
|
message_log.insert(END, content.strip())
|
|
|
|
async def consume_message(reader: asyncio.StreamReader):
|
|
while not reader.at_eof():
|
|
line = await reader.readline()
|
|
add_message(line.decode('ascii'))
|
|
|
|
async def submit_message(writer: asyncio.StreamWriter):
|
|
while True:
|
|
content: str = await outbound_queue.get()
|
|
content = content + "\n" # all messages must newline
|
|
writer.write(content.encode('ascii'))
|
|
add_message(f"YOU: {content}") # echo it back
|
|
outbound_queue.task_done()
|
|
|
|
async def update_tk():
|
|
while running:
|
|
root.update()
|
|
await asyncio.sleep(0.01)
|
|
raise asyncio.CancelledError
|
|
|
|
async def main():
|
|
reader, writer = await asyncio.open_connection(
|
|
"7ks473deh6ggtwqsvbqdurepv5i6iblpbkx33b6cydon3ajph73sssad.onion",
|
|
50000
|
|
)
|
|
await asyncio.gather(update_tk(), consume_message(reader), submit_message(writer))
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|